Changeset 3660

Show
Ignore:
Timestamp:
12/11/08 03:22:46 (2 months ago)
Author:
tack
Message:

Signals.add for convenience (self.signals += ('foo', 'bar'));
Socket.write() now returns InProgress?, which is finished when the data passed
to that write() call is actually written to the socket.

In preparation for Process rewrite, factor out logic that will eventually be
shared between Socket and Process into a common base class called IODescriptor.
This isn't terribly well tested, hopefully I didn't break much. :)

Location:
trunk/base/src/notifier
Files:
2 modified

Legend:

Unmodified
Added
Removed
  • trunk/base/src/notifier/signals.py

    r3587 r3660  
    315315 
    316316 
     317    def __add__(self, signals): 
     318        return Signals(self, *signals) 
     319 
     320 
    317321    def add(self, *signals): 
    318322        """ 
  • trunk/base/src/notifier/sockets.py

    r3645 r3660  
    3838import errno 
    3939import logging 
     40import time 
    4041 
    4142import nf_wrapper as notifier 
     
    4748from kaa.tmpfile import tempfile 
    4849 
     50# FIXME: this file is getting big enough: move IOMonitor and IODescriptor to 
     51# io.py, and leave Socket in this file. 
     52 
    4953# get logging object 
    5054log = logging.getLogger('notifier') 
    5155 
    52 IO_READ   = 0 
    53 IO_WRITE  = 1 
     56IO_READ   = 1 
     57IO_WRITE  = 2 
    5458 
    5559class IOMonitor(notifier.NotifierCallback): 
     
    6468    def register(self, fd, condition = IO_READ): 
    6569        """ 
    66         Register the IOMonitor to a specific socket 
     70        Register the IOMonitor to a specific file descriptor 
    6771        @param fd: File descriptor or Python socket object 
    6872        @param condition: IO_READ or IO_WRITE 
     
    7478        if not is_mainthread(): 
    7579            return MainThreadCallback(self.register)(fd, condition) 
    76         notifier.socket_add(fd, self, condition) 
     80        notifier.socket_add(fd, self, condition-1) 
    7781        self._condition = condition 
    7882        # Must be called _id to correspond with base class. 
     
    8892        if not is_mainthread(): 
    8993            return MainThreadCallback(self.unregister)() 
    90         notifier.socket_remove(self._id, self._condition) 
     94        notifier.socket_remove(self._id, self._condition-1) 
    9195        super(IOMonitor, self).unregister() 
    9296 
     
    102106# We need to import main for the signals dict (we add a handler to 
    103107# shutdown to gracefully close sockets), but main itself imports us 
    104 # for IOHandler.  So we must import main _after_ declaring IOHandler, 
     108# for IOMonitor.  So we must import main _after_ declaring IOMonitor, 
    105109# instead of doing so at the top of the file. 
    106110from kaa.notifier import main 
    107111 
    108 class SocketError(Exception): 
    109     pass 
    110  
    111 class Socket(object): 
     112class IODescriptor(object): 
    112113    """ 
    113     Notifier-aware socket class. 
     114    Base class for read-only, write-only or read-write descriptors such as 
     115    Socket and Process.  Implements logic common to communication over 
     116    descriptors such as async read/writes and read/write buffering. 
    114117    """ 
    115  
    116     def __init__(self, buffer_size=None, chunk_size=1024*1024): 
    117         self.signals = Signals('closed', 'read', 'readline', 'new-client') 
    118         self._socket = None 
    119         self._connecting = False 
    120         self._write_buffer = [] 
    121         self._addr = None 
    122         self._listening = False 
     118    def __init__(self, fd=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): 
     119        self.signals = Signals('closed', 'read', 'readline', 'write') 
     120        self.wrap(fd, mode) 
     121        self._write_queue = [] 
    123122        self._queue_close = False 
    124         self._buffer_size = buffer_size 
    125123        self._chunk_size = chunk_size 
    126  
     124     
    127125        # Internal signals for read() and readline()  (these are different from 
    128         # the public signals 'read' and 'readline' as they get emitted even 
    129         # when data is None.  When these signals get updated, we call 
    130         # _update_read_monitor to register the read IOMonitor. 
     126        # the same-named public signals as they get emitted even when data is 
     127        # None.  When these signals get updated, we call _update_read_monitor 
     128        # to register the read IOMonitor. 
    131129        cb = WeakCallback(self._update_read_monitor) 
    132130        self._read_signal = Signal(cb) 
     
    136134 
    137135        # These variables hold the IOMonitors for monitoring; we only allocate 
    138         # a monitor when the socket is connected to avoid a ref cycle so 
    139         # that disconnected sockets will get properly deleted when they are not 
     136        # a monitor when the fd is connected to avoid a ref cycle so that 
     137        # disconnected fds will get properly deleted when they are not 
    140138        # referenced. 
    141         self._rmon = self._wmon = None 
     139        self._rmon = None 
     140        self._wmon = None 
     141 
     142 
     143    @property 
     144    def alive(self): 
     145        """ 
     146        Returns True if the fd exists and is open. 
     147        """ 
     148        # If the fd is closed, self._fd will be None. 
     149        return self._fd != None 
     150 
     151 
     152    @property 
     153    def fileno(self): 
     154        """ 
     155        Returns the integer for this file descriptor, or None if no descriptor 
     156        has been set. 
     157        """ 
     158        try: 
     159            return self._fd.fileno() 
     160        except AttributeError: 
     161            return self._fd 
     162 
     163 
     164    @property 
     165    def chunk_size(self): 
     166        """ 
     167        Number of bytes to attempt to read from the fd at a time.  The 
     168        default is 1M.  A 'read' signal is emitted for each chunk read from the 
     169        fd.  (The number of bytes read at a time may be less than the chunk 
     170        size, but will never be more.) 
     171        """ 
     172        return self._chunk_size 
     173 
     174 
     175    @chunk_size.setter 
     176    def chunk_size(self, size): 
     177        self._chunk_size = size 
     178 
     179 
     180    # TODO: settable write_queue_max property 
     181 
     182    @property 
     183    def write_queue_size(self): 
     184        """ 
     185        Returns the number of bytes queued in memory to be written to the 
     186        descriptor. 
     187        """ 
     188        # XXX: this is not terribly efficient when the write queue has 
     189        # many elements.  We may decide to keep a separate counter. 
     190        return sum(len(data) for data, inprogress in self._write_queue) 
     191 
     192 
     193    def _is_read_connected(self): 
     194        """ 
     195        Returns True if we're interested in read events. 
     196        """ 
     197        return not len(self._read_signal) == len(self._readline_signal) == \ 
     198                   len(self.signals['read']) == len(self.signals['readline']) == 0 
     199         
     200 
     201    def _update_read_monitor(self, signal=None, change=None): 
     202        """ 
     203        Update read IOMonitor to register or unregister based on if there are 
     204        any handlers attached to the read signals.  If there are no handlers, 
     205        there is no point in reading data from the descriptor since it will go  
     206        nowhere.  This also allows us to push back the read buffer to the OS. 
     207 
     208        We must call this immediately after reading a block, and not defer 
     209        it until the end of the mainloop iteration via a timer in order not 
     210        to lose incoming data between read() calls. 
     211        """ 
     212        if not (self._mode & IO_READ) or not self._rmon or change == Signal.SIGNAL_DISCONNECTED: 
     213            return 
     214        elif not self._is_read_connected(): 
     215            self._rmon.unregister() 
     216        elif not self._rmon.active(): 
     217            self._rmon.register(self.fileno, IO_READ) 
     218 
     219 
     220    def _set_non_blocking(self): 
     221        """ 
     222        Low-level call to set the fd non-blocking.  Can be overridden by 
     223        subclasses. 
     224        """ 
     225        flags = fcntl.fcntl(self.fileno, fcntl.F_GETFL) 
     226        fcntl.fcntl(self.fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) 
     227 
     228 
     229    def wrap(self, fd, mode): 
     230        """ 
     231        Wraps an existing file descriptor. 
     232        """ 
     233        self._fd = fd 
     234        self._mode = mode 
     235        if not fd: 
     236            return 
     237        self._set_non_blocking() 
     238 
     239        if self._mode & IO_READ: 
     240            if self._rmon: 
     241                self._rmon.unregister() 
     242            self._rmon = IOMonitor(self._handle_read) 
     243            self._update_read_monitor() 
     244 
     245        if self._mode & IO_WRITE: 
     246            if self._wmon: 
     247                self._wmon.unregister() 
     248            self._wmon = IOMonitor(self._handle_write) 
     249            if self._write_queue: 
     250                self._wmon.register(self.fileno, IO_WRITE) 
     251 
     252        # Disconnect socket and remove socket file (if unix socket) on shutdown 
     253        main.signals['shutdown'].connect_weak(self.close) 
     254 
     255 
     256    def _is_readable(self): 
     257        """ 
     258        Low-level call to read from fd.  Can be overridden by subclasses. 
     259        """ 
     260        return self._fd != None 
     261 
     262 
     263    def _async_read(self, signal): 
     264        """ 
     265        Common implementation for read() and readline(). 
     266        """ 
     267        if not (self._mode & IO_READ): 
     268            raise IOError(9, 'Cannot read on a write-only descriptor') 
     269        if not self._is_readable(): 
     270            # fd is not readable.  Return an InProgress pre-finished 
     271            # with None 
     272            return InProgress().finish(None) 
     273 
     274        return inprogress(signal) 
     275 
     276 
     277    def read(self): 
     278        """ 
     279        Reads a chunk of data from the fd.  This function returns an  
     280        InProgress object.  If the InProgress is finished with None, it 
     281        means that no data was collected and the fd closed. 
     282 
     283        It is therefore possible to busy-loop by reading on a closed 
     284        fd:: 
     285 
     286            while True: 
     287                fd.read().wait() 
     288 
     289        So the return value of read() should be checked.  Alternatively, 
     290        fd.alive could be tested:: 
     291 
     292            while fd.alive: 
     293                fd.read().wait() 
     294 
     295        """ 
     296        return self._async_read(self._read_signal) 
     297 
     298 
     299    def readline(self): 
     300        """ 
     301        Reads a line from the fd (with newline stripped).  The function 
     302        returns an InProgress object.  If the InProgress is finished with None 
     303        or the empty string, it means that no data was collected and the socket 
     304        closed. 
     305        """ 
     306        return self._async_read(self._readline_signal) 
     307 
     308 
     309    def _read(self, size): 
     310        """ 
     311        Low-level call to read from fd.  Can be overridden by subclasses. 
     312        """ 
     313        return os.read(self.fileno, size) 
     314 
     315 
     316    def _handle_read(self): 
     317        """ 
     318        IOMonitor callback when there is data to be read from the fd. 
     319         
     320        This callback is only registered when we know the user is interested in 
     321        reading data (by connecting to the read or readline signals, or calling 
     322        read() or readline()).  This is necessary for flow control. 
     323        """ 
     324        try: 
     325            data = self._read(self._chunk_size) 
     326        except (IOError, socket.error), (errno, msg): 
     327            if errno == 11: 
     328                # Resource temporarily unavailable -- we are trying to read 
     329                # data on a socket when none is available. 
     330                return 
     331            # If we're here, then the socket is likely disconnected. 
     332            data = None 
     333        except: 
     334            log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__) 
     335            data = None 
     336 
     337        # _read_signal is for InProgress objects waiting on the next read(). 
     338        # For these we must emit even when data is None. 
     339        self._read_signal.emit(data) 
     340 
     341        if not data: 
     342            self._readline_signal.emit(data) 
     343            return self.close(immediate=True, expected=False) 
     344 
     345        self.signals['read'].emit(data) 
     346        # Update read monitor if necessary.  If there are no longer any 
     347        # callbacks left on any of the read signals (most likely _read_signal 
     348        # or _readline_signal), we want to prevent _handle_read() from being 
     349        # called, otherwise next time read() or readline() is called, we will 
     350        # have lost that data. 
     351        self._update_read_monitor() 
     352 
     353 
     354        # TODO: parse input into separate lines and emit readline. 
     355 
     356 
     357    def _write(self, data): 
     358        """ 
     359        Low-level call to write to the fd  Can be overridden by subclasses. 
     360        """ 
     361        return os.write(self.fileno, data) 
     362 
     363 
     364    def write(self, data): 
     365        """ 
     366        Writes the given data to the fd.  This method returns an InProgress 
     367        object which is finished when the given data is fully written to the 
     368        file descriptor. 
     369 
     370        It is not required that the descriptor be open in order to write to it. 
     371        Written data is queued until the descriptor open and then flushed.  As 
     372        writes are asynchronous, all written data is queued.  It is the 
     373        caller's responsibility to ensure the internal write queue does not 
     374        exceed the desired size by waiting for past write() InProgress to 
     375        finish before writing more data. 
     376 
     377        If a write does not complete because the file descriptor was closed 
     378        prematurely, an IOError is thrown to the InProgress. 
     379        """ 
     380        if not (self._mode & IO_WRITE): 
     381            raise IOError(9, 'Cannot write to a read-only descriptor') 
     382 
     383        inprogress = InProgress() 
     384        if data: 
     385            self._write_queue.append((data, inprogress)) 
     386            if self._fd and self._wmon and not self._wmon.active(): 
     387                self._wmon.register(self.fileno, IO_WRITE) 
     388        else: 
     389            # We're writing the null string, nothing really to do.  We're 
     390            # implicitly done. 
     391            inprogress.finish(0) 
     392        return inprogress 
     393 
     394 
     395    def _handle_write(self): 
     396        """ 
     397        IOMonitor callback when the fd is writable.  This callback is not 
     398        registered then the write queue is empty, so we only get called when 
     399        there is something to write. 
     400        """ 
     401        if not self._write_queue: 
     402            # Shouldn't happen; sanity check. 
     403            return 
     404 
     405        try: 
     406            while self._write_queue: 
     407                data, inprogress = self._write_queue.pop(0) 
     408                sent = self._write(data) 
     409                if sent != len(data): 
     410                    # Not all data was able to be sent; push remaining data 
     411                    # back onto the write buffer. 
     412                    self._write_queue.insert(0, (data[sent:], inprogress)) 
     413                    break 
     414                else: 
     415                    # All data is written, finish the InProgress associated 
     416                    # with this write. 
     417                    inprogress.finish(sent) 
     418 
     419            if not self._write_queue: 
     420                if self._queue_close: 
     421                    return self.close(immediate=True) 
     422                self._wmon.unregister() 
     423 
     424        except (IOError, socket.error), (errno, msg): 
     425            if errno == 11: 
     426                # Resource temporarily unavailable -- we are trying to write 
     427                # data to a socket when none is available.  To prevent a busy 
     428                # loop (notifier loop will keep calling us back) we sleep a 
     429                # tiny bit.  It's admittedly a bit kludgy, but it's a simple 
     430                # solution to a condition which should not occur often. 
     431                time.sleep(0.001) 
     432                return 
     433 
     434            # If we're here, then the socket is likely disconnected. 
     435            self.close(immediate=True, expected=False) 
     436 
     437 
     438    def _close(self): 
     439        """ 
     440        Low-level call to close the fd  Can be overridden by subclasses. 
     441        """ 
     442        os.close(self.fileno) 
     443 
     444 
     445    def close(self, immediate=False, expected=True): 
     446        """ 
     447        Closes the fd.  If immediate is False and there is data in the 
     448        write buffer, the fd is closed once the write buffer is emptied. 
     449        Otherwise the fd is closed immediately and the 'closed' signal 
     450        is emitted. 
     451        """ 
     452        if not immediate and self._write_queue: 
     453            # Immediate close not requested and we have some data left 
     454            # to be written, so defer close until after write queue 
     455            # is empty. 
     456            self._queue_close = True 
     457            return 
     458 
     459        if not self._rmon and not self._wmon: 
     460            # already closed 
     461            return 
     462 
     463        if self._rmon: 
     464            self._rmon.unregister() 
     465        if self._wmon: 
     466            self._wmon.unregister() 
     467        self._rmon = None 
     468        self._wmon = None 
     469        self._queue_close = False 
     470 
     471        # Throw IOError to any pending InProgress in the write queue 
     472        for data, inprogress in self._write_queue: 
     473            if len(inprogress): 
     474                # Somebody cares about this InProgress, so we need to finish 
     475                # it. 
     476                inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None) 
     477        del self._write_queue[:] 
     478 
     479        self._close() 
     480        self._fd = None 
     481 
     482        self.signals['closed'].emit(expected) 
     483        main.signals['shutdown'].disconnect(self.close) 
     484 
     485 
     486class SocketError(Exception): 
     487    pass 
     488 
     489class Socket(IODescriptor): 
     490    """ 
     491    Notifier-aware socket class, implementing fully asynchronous reads 
     492    and writes. 
     493    """ 
     494 
     495    def __init__(self, buffer_size=None, chunk_size=1024*1024): 
     496        self._connecting = False 
     497        self._addr = None 
     498        self._listening = False 
     499        self._buffer_size = buffer_size 
     500 
     501        super(Socket, self).__init__(chunk_size=chunk_size) 
     502        self.signals += ('new-client',) 
     503 
    142504 
    143505    def __repr__(self): 
    144         if not self._socket: 
     506        if not self._fd: 
    145507            return '<kaa.Socket - disconnected>' 
    146508        return '<kaa.Socket fd=%d>' % self.fileno 
     
    186548        try: 
    187549            # Will raise exception if socket is not connected. 
    188             self._socket.getpeername() 
     550            self._fd.getpeername() 
    189551            return True 
    190552        except: 
     
    216578    def buffer_size(self, size): 
    217579        self._buffer_size = size 
    218         if self._socket and size: 
    219             self._set_buffer_size(self._socket, size) 
    220  
    221  
    222     @property 
    223     def chunk_size(self): 
    224         """ 
    225         Number of bytes to attempt to read from the socket at a time.  The 
    226         default is 1M.  A 'read' signal is emitted for each chunk read from the 
    227         socket.  (The number of bytes read at a time may be less than the chunk 
    228         size, but will never be more.) 
    229         """ 
    230         return self._chunk_size 
    231  
    232  
    233     @chunk_size.setter 
    234     def chunk_size(self, size): 
    235         self._chunk_size = size 
    236  
    237  
    238     @property 
    239     def fileno(self): 
    240         """ 
    241         Returns the file descriptor of the socket, or None if the socket is 
    242         not connected. 
    243         """ 
    244         if not self._socket: 
    245             return None 
    246         return self._socket.fileno() 
     580        if self._fd and size: 
     581            self._set_buffer_size(self._fd, size) 
    247582 
    248583 
     
    254589        s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, size) 
    255590         
    256  
    257     def _update_read_monitor(self, signal = None, change = None): 
    258         """ 
    259         Update read IOMonitor to register or unregister based on if there are 
    260         any handlers attached to the read signals.  If there are no handlers, 
    261         there is no point in reading data from the socket since it will go  
    262         nowhere.  This also allows us to push back the read buffer to the OS. 
    263  
    264         We must call this immediately after reading a block, and not defer 
    265         it until the end of the mainloop iteration via a timer in order not 
    266         to lose incoming data between read() calls. 
    267         """ 
    268         if not self._rmon or change == Signal.SIGNAL_DISCONNECTED: 
    269             return 
    270         elif not self._listening and len(self._read_signal) == len(self._readline_signal) == \ 
    271                                      len(self.signals['read']) == len(self.signals['readline']) == 0: 
    272             self._rmon.unregister() 
    273         elif not self._rmon.active(): 
    274             self._rmon.register(self._socket, IO_READ) 
    275  
    276591 
    277592    def _normalize_address(self, addr): 
     
    297612 
    298613 
    299     def _make_socket(self, addr = None, overwrite = False): 
     614    def _make_socket(self, addr=None, overwrite=False): 
    300615        """ 
    301616        Constructs a socket based on the given addr.  Returns the socket and 
     
    337652 
    338653 
    339     def _replace_socket(self, sock, addr): 
    340         """ 
    341         Replaces the existing socket and address spec with the ones supplied. 
    342         Any existing socket is closed. 
    343         """ 
    344         if self._socket: 
    345             self._socket.close() 
    346  
    347         self._socket, self._addr = sock, addr 
    348  
    349  
    350     def listen(self, bind_info, qlen = 5): 
     654    def listen(self, bind_info, qlen=5): 
    351655        """ 
    352656        Sets the socket to listen on bind_info, which is either an integer 
     
    413717 
    414718 
    415     def wrap(self, sock, addr = None): 
     719    def wrap(self, sock, addr=None): 
    416720        """ 
    417721        Wraps an existing low-level socket object.  addr specifies the address 
    418722        corresponding to the socket. 
    419723        """ 
    420         self._socket = sock or self._socket 
     724        super(Socket, self).wrap(sock, IO_READ|IO_WRITE) 
    421725        self._addr = addr or self._addr 
    422         self._queue_close = False 
    423  
    424         sock.setblocking(False) 
     726 
    425727        if self._buffer_size: 
    426728            self._set_buffer_size(sock, self._buffer_size) 
    427729 
    428         if self._rmon: 
    429             self._rmon.unregister() 
    430             self._wmon.unregister() 
    431  
    432         self._rmon = IOMonitor(self._handle_read) 
    433         self._wmon = IOMonitor(self._handle_write) 
    434  
    435         self._update_read_monitor() 
    436         if self._write_buffer: 
    437             self._wmon.register(sock, IO_WRITE) 
    438  
    439         # Disconnect socket and remove socket file (if unix socket) on shutdown 
    440         main.signals['shutdown'].connect_weak(self.close) 
    441          
    442  
    443     def _async_read(self, signal): 
    444         if not self._socket and not self._connecting: 
    445             # Socket is closed.  Return an InProgress pre-finished 
    446             # with None 
    447             return InProgress().finish(None) 
    448  
    449         if self._listening: 
    450             raise SocketError("Can't read on a listening socket.") 
    451  
    452         return inprogress(signal) 
    453  
    454  
    455     def read(self): 
    456         """ 
    457         Reads a chunk of data from the socket.  This function returns an  
    458         InProgress object.  If the InProgress is finished with None, it 
    459         means that no data was collected and the socket closed. 
    460  
    461         It is therefore possible to busy-loop by reading on a closed 
    462         socket:: 
    463  
    464             while True: 
    465                 socket.read().wait() 
    466  
    467         So the return value of read() should be checked.  Alternatively, 
    468         socket.alive could be tested:: 
    469  
    470             while socket.alive: 
    471                 socket.read().wait() 
    472  
    473         """ 
    474         return self._async_read(self._read_signal) 
    475  
    476  
    477     def readline(self): 
    478         """ 
    479         Reads a line from the socket (with newline stripped).  The function 
    480         returns an InProgress object.  If the InProgress is finished with 
    481         None, it means that no data was collected and the socket closed. 
    482         """ 
    483         return self._async_read(self._readline_signal) 
     730 
     731    def _is_read_connected(self): 
     732        return self._listening or super(Socket, self)._is_read_connected() 
     733 
     734 
     735    def _set_non_blocking(self): 
     736        self._fd.setblocking(False) 
     737 
     738 
     739    def _is_readable(self): 
     740        return self._fd and not self._connecting 
     741 
     742 
     743    def _read(self, size): 
     744        return self._fd.recv(size) 
     745 
     746 
     747    def _write(self, data): 
     748        return self._fd.send(data) 
     749 
     750 
     751    def _close(self): 
     752        self._fd.close() 
    484753 
    485754 
     
    488757        Accept a new connection and return a new Socket object. 
    489758        """ 
    490         sock, addr = self._socket.accept() 
     759        sock, addr = self._fd.accept() 
    491760        # create new Socket from the same class this object is 
    492761        client_socket = self.__class__() 
     
    499768            return self._accept() 
    500769 
    501         try: 
    502             data = self._socket.recv(self._chunk_size) 
    503         except socket.error, (errno, msg): 
    504             if errno == 11: 
    505                 # Resource temporarily unavailable -- we are trying to read