Changeset 3662

Show
Ignore:
Timestamp:
13/11/08 03:06:46 (8 weeks ago)
Author:
tack
Message:

Move out non-socket stuff into io.py; several changes to IODescriptor
to accommodate Process rewrite.

Location:
trunk/base/src/notifier
Files:
1 added
3 modified

Legend:

Unmodified
Added
Removed
  • trunk/base/src/notifier/__init__.py

    r3468 r3662  
    7777 
    7878# IO/Socket handling 
    79 from sockets import IOMonitor, WeakIOMonitor, Socket, IO_READ, IO_WRITE 
     79from io import IOMonitor, WeakIOMonitor, IO_READ, IO_WRITE 
     80from sockets import Socket, SocketError 
    8081 
    8182# Event and event handler classes 
  • trunk/base/src/notifier/popen.py

    r3645 r3662  
    6262from thread import MainThreadCallback, is_mainthread 
    6363from async import InProgress, inprogress 
    64 from sockets import IOMonitor, IO_WRITE 
     64from io import IOMonitor, IO_WRITE 
    6565 
    6666# get logging object 
  • trunk/base/src/notifier/sockets.py

    r3661 r3662  
    11# -*- coding: iso-8859-1 -*- 
    22# ----------------------------------------------------------------------------- 
    3 # sockets.py - Socket (fd) classes for the notifier 
     3# sockets.py - Notifier-aware TCP/Unix Socket 
    44# ----------------------------------------------------------------------------- 
    55# $Id$ 
     
    3131# ----------------------------------------------------------------------------- 
    3232 
    33 __all__ = [ 'IOMonitor', 'WeakIOMonitor', 'Socket', 'IO_READ', 'IO_WRITE' ] 
     33__all__ = [ 'Socket', 'SocketError' ] 
    3434 
    3535import sys 
    3636import os 
    3737import socket 
    38 import errno 
    3938import logging 
    4039import time 
    4140 
    4241import nf_wrapper as notifier 
    43 from callback import WeakCallback 
    44 from signals import Signals, Signal 
    45 from thread import MainThreadCallback, is_mainthread, threaded 
    46 from async import InProgress, inprogress 
     42from thread import threaded 
     43from async import InProgress 
     44from io import IO_READ, IO_WRITE, IODescriptor 
    4745from kaa.utils import property 
    4846from kaa.tmpfile import tempfile 
    4947 
    50 # FIXME: this file is getting big enough: move IOMonitor and IODescriptor to 
    51 # io.py, and leave Socket in this file. 
    52  
    5348# get logging object 
    5449log = logging.getLogger('notifier') 
    55  
    56 IO_READ   = 1 
    57 IO_WRITE  = 2 
    58  
    59 class IOMonitor(notifier.NotifierCallback): 
    60     def __init__(self, callback, *args, **kwargs): 
    61         """ 
    62         Creates an IOMonitor to monitor IO activity. 
    63         """ 
    64         super(IOMonitor, self).__init__(callback, *args, **kwargs) 
    65         self.set_ignore_caller_args() 
    66  
    67  
    68     def register(self, fd, condition = IO_READ): 
    69         """ 
    70         Register the IOMonitor to a specific file descriptor 
    71         @param fd: File descriptor or Python socket object 
    72         @param condition: IO_READ or IO_WRITE 
    73         """ 
    74         if self.active(): 
    75             if fd != self._id or condition != self._condition: 
    76                 raise ValueError('Existing file descriptor already registered with this IOMonitor.') 
    77             return 
    78         if not is_mainthread(): 
    79             return MainThreadCallback(self.register)(fd, condition) 
    80         notifier.socket_add(fd, self, condition-1) 
    81         self._condition = condition 
    82         # Must be called _id to correspond with base class. 
    83         self._id = fd 
    84  
    85  
    86     def unregister(self): 
    87         """ 
    88         Unregister the IOMonitor 
    89         """ 
    90         if not self.active(): 
    91             return 
    92         if not is_mainthread(): 
    93             return MainThreadCallback(self.unregister)() 
    94         notifier.socket_remove(self._id, self._condition-1) 
    95         super(IOMonitor, self).unregister() 
    96  
    97  
    98  
    99 class WeakIOMonitor(notifier.WeakNotifierCallback, IOMonitor): 
    100     """ 
    101     IOMonitor using weak references for the callback. 
    102     """ 
    103     pass 
    104  
    105  
    106 # We need to import main for the signals dict (we add a handler to 
    107 # shutdown to gracefully close sockets), but main itself imports us 
    108 # for IOMonitor.  So we must import main _after_ declaring IOMonitor, 
    109 # instead of doing so at the top of the file. 
    110 from kaa.notifier import main 
    111  
    112 class IODescriptor(object): 
    113     """ 
    114     Base class for read-only, write-only or read-write descriptors such as 
    115     Socket and Process.  Implements logic common to communication over 
    116     descriptors such as async read/writes and read/write buffering. 
    117     """ 
    118     def __init__(self, fd=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): 
    119         self.signals = Signals('closed', 'read', 'readline', 'write') 
    120         self.wrap(fd, mode) 
    121         self._write_queue = [] 
    122         self._queue_close = False 
    123         self._chunk_size = chunk_size 
    124      
    125         # Internal signals for read() and readline()  (these are different from 
    126         # the same-named public signals as they get emitted even when data is 
    127         # None.  When these signals get updated, we call _update_read_monitor 
    128         # to register the read IOMonitor. 
    129         cb = WeakCallback(self._update_read_monitor) 
    130         self._read_signal = Signal(cb) 
    131         self._readline_signal = Signal(cb) 
    132         self.signals['read'].changed_cb = cb 
    133         self.signals['readline'].changed_cb = cb 
    134  
    135         # These variables hold the IOMonitors for monitoring; we only allocate 
    136         # a monitor when the fd is connected to avoid a ref cycle so that 
    137         # disconnected fds will get properly deleted when they are not 
    138         # referenced. 
    139         self._rmon = None 
    140         self._wmon = None 
    141  
    142  
    143     @property 
    144     def alive(self): 
    145         """ 
    146         Returns True if the fd exists and is open. 
    147         """ 
    148         # If the fd is closed, self._fd will be None. 
    149         return self._fd != None 
    150  
    151  
    152     @property 
    153     def fileno(self): 
    154         """ 
    155         Returns the integer for this file descriptor, or None if no descriptor 
    156         has been set. 
    157         """ 
    158         try: 
    159             return self._fd.fileno() 
    160         except AttributeError: 
    161             return self._fd 
    162  
    163  
    164     @property 
    165     def chunk_size(self): 
    166         """ 
    167         Number of bytes to attempt to read from the fd at a time.  The 
    168         default is 1M.  A 'read' signal is emitted for each chunk read from the 
    169         fd.  (The number of bytes read at a time may be less than the chunk 
    170         size, but will never be more.) 
    171         """ 
    172         return self._chunk_size 
    173  
    174  
    175     @chunk_size.setter 
    176     def chunk_size(self, size): 
    177         self._chunk_size = size 
    178  
    179  
    180     # TODO: settable write_queue_max property 
    181  
    182     @property 
    183     def write_queue_size(self): 
    184         """ 
    185         Returns the number of bytes queued in memory to be written to the 
    186         descriptor. 
    187         """ 
    188         # XXX: this is not terribly efficient when the write queue has 
    189         # many elements.  We may decide to keep a separate counter. 
    190         return sum(len(data) for data, inprogress in self._write_queue) 
    191  
    192  
    193     def _is_read_connected(self): 
    194         """ 
    195         Returns True if we're interested in read events. 
    196         """ 
    197         return not len(self._read_signal) == len(self._readline_signal) == \ 
    198                    len(self.signals['read']) == len(self.signals['readline']) == 0 
    199          
    200  
    201     def _update_read_monitor(self, signal=None, change=None): 
    202         """ 
    203         Update read IOMonitor to register or unregister based on if there are 
    204         any handlers attached to the read signals.  If there are no handlers, 
    205         there is no point in reading data from the descriptor since it will go  
    206         nowhere.  This also allows us to push back the read buffer to the OS. 
    207  
    208         We must call this immediately after reading a block, and not defer 
    209         it until the end of the mainloop iteration via a timer in order not 
    210         to lose incoming data between read() calls. 
    211         """ 
    212         if not (self._mode & IO_READ) or not self._rmon or change == Signal.SIGNAL_DISCONNECTED: 
    213             return 
    214         elif not self._is_read_connected(): 
    215             self._rmon.unregister() 
    216         elif not self._rmon.active(): 
    217             self._rmon.register(self.fileno, IO_READ) 
    218  
    219  
    220     def _set_non_blocking(self): 
    221         """ 
    222         Low-level call to set the fd non-blocking.  Can be overridden by 
    223         subclasses. 
    224         """ 
    225         flags = fcntl.fcntl(self.fileno, fcntl.F_GETFL) 
    226         fcntl.fcntl(self.fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) 
    227  
    228  
    229     def wrap(self, fd, mode): 
    230         """ 
    231         Wraps an existing file descriptor. 
    232         """ 
    233         self._fd = fd 
    234         self._mode = mode 
    235         if not fd: 
    236             return 
    237         self._set_non_blocking() 
    238  
    239         if self._rmon: 
    240             self._rmon.unregister() 
    241             self._rmon = None 
    242         if self._wmon: 
    243             self._wmon.unregister() 
    244             self._wmon = None 
    245  
    246         if self._mode & IO_READ: 
    247             self._rmon = IOMonitor(self._handle_read) 
    248             self._update_read_monitor() 
    249         if self._mode & IO_WRITE: 
    250             self._wmon = IOMonitor(self._handle_write) 
    251             if self._write_queue: 
    252                 self._wmon.register(self.fileno, IO_WRITE) 
    253  
    254         # Disconnect socket and remove socket file (if unix socket) on shutdown 
    255         main.signals['shutdown'].connect_weak(self.close) 
    256  
    257  
    258     def _is_readable(self): 
    259         """ 
    260         Low-level call to read from fd.  Can be overridden by subclasses. 
    261         """ 
    262         return self._fd != None 
    263  
    264  
    265     def _async_read(self, signal): 
    266         """ 
    267         Common implementation for read() and readline(). 
    268         """ 
    269         if not (self._mode & IO_READ): 
    270             raise IOError(9, 'Cannot read on a write-only descriptor') 
    271         if not self._is_readable(): 
    272             # fd is not readable.  Return an InProgress pre-finished 
    273             # with None 
    274             return InProgress().finish(None) 
    275  
    276         return inprogress(signal) 
    277  
    278  
    279     def read(self): 
    280         """ 
    281         Reads a chunk of data from the fd.  This function returns an  
    282         InProgress object.  If the InProgress is finished with None, it 
    283         means that no data was collected and the fd closed. 
    284  
    285         It is therefore possible to busy-loop by reading on a closed 
    286         fd:: 
    287  
    288             while True: 
    289                 fd.read().wait() 
    290  
    291         So the return value of read() should be checked.  Alternatively, 
    292         fd.alive could be tested:: 
    293  
    294             while fd.alive: 
    295                 fd.read().wait() 
    296  
    297         """ 
    298         return self._async_read(self._read_signal) 
    299  
    300  
    301     def readline(self): 
    302         """ 
    303         Reads a line from the fd (with newline stripped).  The function 
    304         returns an InProgress object.  If the InProgress is finished with None 
    305         or the empty string, it means that no data was collected and the socket 
    306         closed. 
    307         """ 
    308         return self._async_read(self._readline_signal) 
    309  
    310  
    311     def _read(self, size): 
    312         """ 
    313         Low-level call to read from fd.  Can be overridden by subclasses. 
    314         """ 
    315         return os.read(self.fileno, size) 
    316  
    317  
    318     def _handle_read(self): 
    319         """ 
    320         IOMonitor callback when there is data to be read from the fd. 
    321          
    322         This callback is only registered when we know the user is interested in 
    323         reading data (by connecting to the read or readline signals, or calling 
    324         read() or readline()).  This is necessary for flow control. 
    325         """ 
    326         try: 
    327             data = self._read(self._chunk_size) 
    328         except (IOError, socket.error), (errno, msg): 
    329             if errno == 11: 
    330                 # Resource temporarily unavailable -- we are trying to read 
    331                 # data on a socket when none is available. 
    332                 return 
    333             # If we're here, then the socket is likely disconnected. 
    334             data = None 
    335         except: 
    336             log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__) 
    337             data = None 
    338  
    339         # _read_signal is for InProgress objects waiting on the next read(). 
    340         # For these we must emit even when data is None. 
    341         self._read_signal.emit(data) 
    342  
    343         if not data: 
    344             self._readline_signal.emit(data) 
    345             return self.close(immediate=True, expected=False) 
    346  
    347         self.signals['read'].emit(data) 
    348         # Update read monitor if necessary.  If there are no longer any 
    349         # callbacks left on any of the read signals (most likely _read_signal 
    350         # or _readline_signal), we want to prevent _handle_read() from being 
    351         # called, otherwise next time read() or readline() is called, we will 
    352         # have lost that data. 
    353         self._update_read_monitor() 
    354  
    355  
    356         # TODO: parse input into separate lines and emit readline. 
    357  
    358  
    359     def _write(self, data): 
    360         """ 
    361         Low-level call to write to the fd  Can be overridden by subclasses. 
    362         """ 
    363         return os.write(self.fileno, data) 
    364  
    365  
    366     def write(self, data): 
    367         """ 
    368         Writes the given data to the fd.  This method returns an InProgress 
    369         object which is finished when the given data is fully written to the 
    370         file descriptor. 
    371  
    372         It is not required that the descriptor be open in order to write to it. 
    373         Written data is queued until the descriptor open and then flushed.  As 
    374         writes are asynchronous, all written data is queued.  It is the 
    375         caller's responsibility to ensure the internal write queue does not 
    376         exceed the desired size by waiting for past write() InProgress to 
    377         finish before writing more data. 
    378  
    379         If a write does not complete because the file descriptor was closed 
    380         prematurely, an IOError is thrown to the InProgress. 
    381         """ 
    382         if not (self._mode & IO_WRITE): 
    383             raise IOError(9, 'Cannot write to a read-only descriptor') 
    384  
    385         inprogress = InProgress() 
    386         if data: 
    387             self._write_queue.append((data, inprogress)) 
    388             if self._fd and self._wmon and not self._wmon.active(): 
    389                 self._wmon.register(self.fileno, IO_WRITE) 
    390         else: 
    391             # We're writing the null string, nothing really to do.  We're 
    392             # implicitly done. 
    393             inprogress.finish(0) 
    394         return inprogress 
    395  
    396  
    397     def _handle_write(self): 
    398         """ 
    399         IOMonitor callback when the fd is writable.  This callback is not 
    400         registered then the write queue is empty, so we only get called when 
    401         there is something to write. 
    402         """ 
    403         if not self._write_queue: 
    404             # Shouldn't happen; sanity check. 
    405             return 
    406  
    407         try: 
    408             while self._write_queue: 
    409                 data, inprogress = self._write_queue.pop(0) 
    410                 sent = self._write(data) 
    411                 if sent != len(data): 
    412                     # Not all data was able to be sent; push remaining data 
    413                     # back onto the write buffer. 
    414                     self._write_queue.insert(0, (data[sent:], inprogress)) 
    415                     break 
    416                 else: 
    417                     # All data is written, finish the InProgress associated 
    418                     # with this write. 
    419                     inprogress.finish(sent) 
    420  
    421             if not self._write_queue: 
    422                 if self._queue_close: 
    423                     return self.close(immediate=True) 
    424                 self._wmon.unregister() 
    425  
    426         except (IOError, socket.error), (errno, msg): 
    427             if errno == 11: 
    428                 # Resource temporarily unavailable -- we are trying to write 
    429                 # data to a socket when none is available.  To prevent a busy 
    430                 # loop (notifier loop will keep calling us back) we sleep a 
    431                 # tiny bit.  It's admittedly a bit kludgy, but it's a simple 
    432                 # solution to a condition which should not occur often. 
    433                 time.sleep(0.001) 
    434                 return 
    435  
    436             # If we're here, then the socket is likely disconnected. 
    437             self.close(immediate=True, expected=False) 
    438  
    439  
    440     def _close(self): 
    441         """ 
    442         Low-level call to close the fd  Can be overridden by subclasses. 
    443         """ 
    444         os.close(self.fileno) 
    445  
    446  
    447     def close(self, immediate=False, expected=True): 
    448         """ 
    449         Closes the fd.  If immediate is False and there is data in the 
    450         write buffer, the fd is closed once the write buffer is emptied. 
    451         Otherwise the fd is closed immediately and the 'closed' signal 
    452         is emitted. 
    453         """ 
    454         if not immediate and self._write_queue: 
    455             # Immediate close not requested and we have some data left 
    456             # to be written, so defer close until after write queue 
    457             # is empty. 
    458             self._queue_close = True 
    459             return 
    460  
    461         if not self._rmon and not self._wmon: 
    462             # already closed 
    463             return 
    464  
    465         if self._rmon: 
    466             self._rmon.unregister() 
    467         if self._wmon: 
    468             self._wmon.unregister() 
    469         self._rmon = None 
    470         self._wmon = None 
    471         self._queue_close = False 
    472  
    473         # Throw IOError to any pending InProgress in the write queue 
    474         for data, inprogress in self._write_queue: 
    475             if len(inprogress): 
    476                 # Somebody cares about this InProgress, so we need to finish 
    477                 # it. 
    478                 inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None) 
    479         del self._write_queue[:] 
    480  
    481         self._close() 
    482         self._fd = None 
    483  
    484         self.signals['closed'].emit(expected) 
    485         main.signals['shutdown'].disconnect(self.close) 
    486  
    48750 
    48851class SocketError(Exception): 
     
    751314 
    752315 
    753     def _close(self): 
    754         self._fd.close() 
    755  
    756  
    757316    def _accept(self): 
    758317        """