| 55 | | |
| 56 | | IO_READ = 1 |
| 57 | | IO_WRITE = 2 |
| 58 | | |
| 59 | | class IOMonitor(notifier.NotifierCallback): |
| 60 | | def __init__(self, callback, *args, **kwargs): |
| 61 | | """ |
| 62 | | Creates an IOMonitor to monitor IO activity. |
| 63 | | """ |
| 64 | | super(IOMonitor, self).__init__(callback, *args, **kwargs) |
| 65 | | self.set_ignore_caller_args() |
| 66 | | |
| 67 | | |
| 68 | | def register(self, fd, condition = IO_READ): |
| 69 | | """ |
| 70 | | Register the IOMonitor to a specific file descriptor |
| 71 | | @param fd: File descriptor or Python socket object |
| 72 | | @param condition: IO_READ or IO_WRITE |
| 73 | | """ |
| 74 | | if self.active(): |
| 75 | | if fd != self._id or condition != self._condition: |
| 76 | | raise ValueError('Existing file descriptor already registered with this IOMonitor.') |
| 77 | | return |
| 78 | | if not is_mainthread(): |
| 79 | | return MainThreadCallback(self.register)(fd, condition) |
| 80 | | notifier.socket_add(fd, self, condition-1) |
| 81 | | self._condition = condition |
| 82 | | # Must be called _id to correspond with base class. |
| 83 | | self._id = fd |
| 84 | | |
| 85 | | |
| 86 | | def unregister(self): |
| 87 | | """ |
| 88 | | Unregister the IOMonitor |
| 89 | | """ |
| 90 | | if not self.active(): |
| 91 | | return |
| 92 | | if not is_mainthread(): |
| 93 | | return MainThreadCallback(self.unregister)() |
| 94 | | notifier.socket_remove(self._id, self._condition-1) |
| 95 | | super(IOMonitor, self).unregister() |
| 96 | | |
| 97 | | |
| 98 | | |
| 99 | | class WeakIOMonitor(notifier.WeakNotifierCallback, IOMonitor): |
| 100 | | """ |
| 101 | | IOMonitor using weak references for the callback. |
| 102 | | """ |
| 103 | | pass |
| 104 | | |
| 105 | | |
| 106 | | # We need to import main for the signals dict (we add a handler to |
| 107 | | # shutdown to gracefully close sockets), but main itself imports us |
| 108 | | # for IOMonitor. So we must import main _after_ declaring IOMonitor, |
| 109 | | # instead of doing so at the top of the file. |
| 110 | | from kaa.notifier import main |
| 111 | | |
| 112 | | class IODescriptor(object): |
| 113 | | """ |
| 114 | | Base class for read-only, write-only or read-write descriptors such as |
| 115 | | Socket and Process. Implements logic common to communication over |
| 116 | | descriptors such as async read/writes and read/write buffering. |
| 117 | | """ |
| 118 | | def __init__(self, fd=None, mode=IO_READ|IO_WRITE, chunk_size=1024*1024): |
| 119 | | self.signals = Signals('closed', 'read', 'readline', 'write') |
| 120 | | self.wrap(fd, mode) |
| 121 | | self._write_queue = [] |
| 122 | | self._queue_close = False |
| 123 | | self._chunk_size = chunk_size |
| 124 | | |
| 125 | | # Internal signals for read() and readline() (these are different from |
| 126 | | # the same-named public signals as they get emitted even when data is |
| 127 | | # None. When these signals get updated, we call _update_read_monitor |
| 128 | | # to register the read IOMonitor. |
| 129 | | cb = WeakCallback(self._update_read_monitor) |
| 130 | | self._read_signal = Signal(cb) |
| 131 | | self._readline_signal = Signal(cb) |
| 132 | | self.signals['read'].changed_cb = cb |
| 133 | | self.signals['readline'].changed_cb = cb |
| 134 | | |
| 135 | | # These variables hold the IOMonitors for monitoring; we only allocate |
| 136 | | # a monitor when the fd is connected to avoid a ref cycle so that |
| 137 | | # disconnected fds will get properly deleted when they are not |
| 138 | | # referenced. |
| 139 | | self._rmon = None |
| 140 | | self._wmon = None |
| 141 | | |
| 142 | | |
| 143 | | @property |
| 144 | | def alive(self): |
| 145 | | """ |
| 146 | | Returns True if the fd exists and is open. |
| 147 | | """ |
| 148 | | # If the fd is closed, self._fd will be None. |
| 149 | | return self._fd != None |
| 150 | | |
| 151 | | |
| 152 | | @property |
| 153 | | def fileno(self): |
| 154 | | """ |
| 155 | | Returns the integer for this file descriptor, or None if no descriptor |
| 156 | | has been set. |
| 157 | | """ |
| 158 | | try: |
| 159 | | return self._fd.fileno() |
| 160 | | except AttributeError: |
| 161 | | return self._fd |
| 162 | | |
| 163 | | |
| 164 | | @property |
| 165 | | def chunk_size(self): |
| 166 | | """ |
| 167 | | Number of bytes to attempt to read from the fd at a time. The |
| 168 | | default is 1M. A 'read' signal is emitted for each chunk read from the |
| 169 | | fd. (The number of bytes read at a time may be less than the chunk |
| 170 | | size, but will never be more.) |
| 171 | | """ |
| 172 | | return self._chunk_size |
| 173 | | |
| 174 | | |
| 175 | | @chunk_size.setter |
| 176 | | def chunk_size(self, size): |
| 177 | | self._chunk_size = size |
| 178 | | |
| 179 | | |
| 180 | | # TODO: settable write_queue_max property |
| 181 | | |
| 182 | | @property |
| 183 | | def write_queue_size(self): |
| 184 | | """ |
| 185 | | Returns the number of bytes queued in memory to be written to the |
| 186 | | descriptor. |
| 187 | | """ |
| 188 | | # XXX: this is not terribly efficient when the write queue has |
| 189 | | # many elements. We may decide to keep a separate counter. |
| 190 | | return sum(len(data) for data, inprogress in self._write_queue) |
| 191 | | |
| 192 | | |
| 193 | | def _is_read_connected(self): |
| 194 | | """ |
| 195 | | Returns True if we're interested in read events. |
| 196 | | """ |
| 197 | | return not len(self._read_signal) == len(self._readline_signal) == \ |
| 198 | | len(self.signals['read']) == len(self.signals['readline']) == 0 |
| 199 | | |
| 200 | | |
| 201 | | def _update_read_monitor(self, signal=None, change=None): |
| 202 | | """ |
| 203 | | Update read IOMonitor to register or unregister based on if there are |
| 204 | | any handlers attached to the read signals. If there are no handlers, |
| 205 | | there is no point in reading data from the descriptor since it will go |
| 206 | | nowhere. This also allows us to push back the read buffer to the OS. |
| 207 | | |
| 208 | | We must call this immediately after reading a block, and not defer |
| 209 | | it until the end of the mainloop iteration via a timer in order not |
| 210 | | to lose incoming data between read() calls. |
| 211 | | """ |
| 212 | | if not (self._mode & IO_READ) or not self._rmon or change == Signal.SIGNAL_DISCONNECTED: |
| 213 | | return |
| 214 | | elif not self._is_read_connected(): |
| 215 | | self._rmon.unregister() |
| 216 | | elif not self._rmon.active(): |
| 217 | | self._rmon.register(self.fileno, IO_READ) |
| 218 | | |
| 219 | | |
| 220 | | def _set_non_blocking(self): |
| 221 | | """ |
| 222 | | Low-level call to set the fd non-blocking. Can be overridden by |
| 223 | | subclasses. |
| 224 | | """ |
| 225 | | flags = fcntl.fcntl(self.fileno, fcntl.F_GETFL) |
| 226 | | fcntl.fcntl(self.fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
| 227 | | |
| 228 | | |
| 229 | | def wrap(self, fd, mode): |
| 230 | | """ |
| 231 | | Wraps an existing file descriptor. |
| 232 | | """ |
| 233 | | self._fd = fd |
| 234 | | self._mode = mode |
| 235 | | if not fd: |
| 236 | | return |
| 237 | | self._set_non_blocking() |
| 238 | | |
| 239 | | if self._rmon: |
| 240 | | self._rmon.unregister() |
| 241 | | self._rmon = None |
| 242 | | if self._wmon: |
| 243 | | self._wmon.unregister() |
| 244 | | self._wmon = None |
| 245 | | |
| 246 | | if self._mode & IO_READ: |
| 247 | | self._rmon = IOMonitor(self._handle_read) |
| 248 | | self._update_read_monitor() |
| 249 | | if self._mode & IO_WRITE: |
| 250 | | self._wmon = IOMonitor(self._handle_write) |
| 251 | | if self._write_queue: |
| 252 | | self._wmon.register(self.fileno, IO_WRITE) |
| 253 | | |
| 254 | | # Disconnect socket and remove socket file (if unix socket) on shutdown |
| 255 | | main.signals['shutdown'].connect_weak(self.close) |
| 256 | | |
| 257 | | |
| 258 | | def _is_readable(self): |
| 259 | | """ |
| 260 | | Low-level call to read from fd. Can be overridden by subclasses. |
| 261 | | """ |
| 262 | | return self._fd != None |
| 263 | | |
| 264 | | |
| 265 | | def _async_read(self, signal): |
| 266 | | """ |
| 267 | | Common implementation for read() and readline(). |
| 268 | | """ |
| 269 | | if not (self._mode & IO_READ): |
| 270 | | raise IOError(9, 'Cannot read on a write-only descriptor') |
| 271 | | if not self._is_readable(): |
| 272 | | # fd is not readable. Return an InProgress pre-finished |
| 273 | | # with None |
| 274 | | return InProgress().finish(None) |
| 275 | | |
| 276 | | return inprogress(signal) |
| 277 | | |
| 278 | | |
| 279 | | def read(self): |
| 280 | | """ |
| 281 | | Reads a chunk of data from the fd. This function returns an |
| 282 | | InProgress object. If the InProgress is finished with None, it |
| 283 | | means that no data was collected and the fd closed. |
| 284 | | |
| 285 | | It is therefore possible to busy-loop by reading on a closed |
| 286 | | fd:: |
| 287 | | |
| 288 | | while True: |
| 289 | | fd.read().wait() |
| 290 | | |
| 291 | | So the return value of read() should be checked. Alternatively, |
| 292 | | fd.alive could be tested:: |
| 293 | | |
| 294 | | while fd.alive: |
| 295 | | fd.read().wait() |
| 296 | | |
| 297 | | """ |
| 298 | | return self._async_read(self._read_signal) |
| 299 | | |
| 300 | | |
| 301 | | def readline(self): |
| 302 | | """ |
| 303 | | Reads a line from the fd (with newline stripped). The function |
| 304 | | returns an InProgress object. If the InProgress is finished with None |
| 305 | | or the empty string, it means that no data was collected and the socket |
| 306 | | closed. |
| 307 | | """ |
| 308 | | return self._async_read(self._readline_signal) |
| 309 | | |
| 310 | | |
| 311 | | def _read(self, size): |
| 312 | | """ |
| 313 | | Low-level call to read from fd. Can be overridden by subclasses. |
| 314 | | """ |
| 315 | | return os.read(self.fileno, size) |
| 316 | | |
| 317 | | |
| 318 | | def _handle_read(self): |
| 319 | | """ |
| 320 | | IOMonitor callback when there is data to be read from the fd. |
| 321 | | |
| 322 | | This callback is only registered when we know the user is interested in |
| 323 | | reading data (by connecting to the read or readline signals, or calling |
| 324 | | read() or readline()). This is necessary for flow control. |
| 325 | | """ |
| 326 | | try: |
| 327 | | data = self._read(self._chunk_size) |
| 328 | | except (IOError, socket.error), (errno, msg): |
| 329 | | if errno == 11: |
| 330 | | # Resource temporarily unavailable -- we are trying to read |
| 331 | | # data on a socket when none is available. |
| 332 | | return |
| 333 | | # If we're here, then the socket is likely disconnected. |
| 334 | | data = None |
| 335 | | except: |
| 336 | | log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__) |
| 337 | | data = None |
| 338 | | |
| 339 | | # _read_signal is for InProgress objects waiting on the next read(). |
| 340 | | # For these we must emit even when data is None. |
| 341 | | self._read_signal.emit(data) |
| 342 | | |
| 343 | | if not data: |
| 344 | | self._readline_signal.emit(data) |
| 345 | | return self.close(immediate=True, expected=False) |
| 346 | | |
| 347 | | self.signals['read'].emit(data) |
| 348 | | # Update read monitor if necessary. If there are no longer any |
| 349 | | # callbacks left on any of the read signals (most likely _read_signal |
| 350 | | # or _readline_signal), we want to prevent _handle_read() from being |
| 351 | | # called, otherwise next time read() or readline() is called, we will |
| 352 | | # have lost that data. |
| 353 | | self._update_read_monitor() |
| 354 | | |
| 355 | | |
| 356 | | # TODO: parse input into separate lines and emit readline. |
| 357 | | |
| 358 | | |
| 359 | | def _write(self, data): |
| 360 | | """ |
| 361 | | Low-level call to write to the fd Can be overridden by subclasses. |
| 362 | | """ |
| 363 | | return os.write(self.fileno, data) |
| 364 | | |
| 365 | | |
| 366 | | def write(self, data): |
| 367 | | """ |
| 368 | | Writes the given data to the fd. This method returns an InProgress |
| 369 | | object which is finished when the given data is fully written to the |
| 370 | | file descriptor. |
| 371 | | |
| 372 | | It is not required that the descriptor be open in order to write to it. |
| 373 | | Written data is queued until the descriptor open and then flushed. As |
| 374 | | writes are asynchronous, all written data is queued. It is the |
| 375 | | caller's responsibility to ensure the internal write queue does not |
| 376 | | exceed the desired size by waiting for past write() InProgress to |
| 377 | | finish before writing more data. |
| 378 | | |
| 379 | | If a write does not complete because the file descriptor was closed |
| 380 | | prematurely, an IOError is thrown to the InProgress. |
| 381 | | """ |
| 382 | | if not (self._mode & IO_WRITE): |
| 383 | | raise IOError(9, 'Cannot write to a read-only descriptor') |
| 384 | | |
| 385 | | inprogress = InProgress() |
| 386 | | if data: |
| 387 | | self._write_queue.append((data, inprogress)) |
| 388 | | if self._fd and self._wmon and not self._wmon.active(): |
| 389 | | self._wmon.register(self.fileno, IO_WRITE) |
| 390 | | else: |
| 391 | | # We're writing the null string, nothing really to do. We're |
| 392 | | # implicitly done. |
| 393 | | inprogress.finish(0) |
| 394 | | return inprogress |
| 395 | | |
| 396 | | |
| 397 | | def _handle_write(self): |
| 398 | | """ |
| 399 | | IOMonitor callback when the fd is writable. This callback is not |
| 400 | | registered then the write queue is empty, so we only get called when |
| 401 | | there is something to write. |
| 402 | | """ |
| 403 | | if not self._write_queue: |
| 404 | | # Shouldn't happen; sanity check. |
| 405 | | return |
| 406 | | |
| 407 | | try: |
| 408 | | while self._write_queue: |
| 409 | | data, inprogress = self._write_queue.pop(0) |
| 410 | | sent = self._write(data) |
| 411 | | if sent != len(data): |
| 412 | | # Not all data was able to be sent; push remaining data |
| 413 | | # back onto the write buffer. |
| 414 | | self._write_queue.insert(0, (data[sent:], inprogress)) |
| 415 | | break |
| 416 | | else: |
| 417 | | # All data is written, finish the InProgress associated |
| 418 | | # with this write. |
| 419 | | inprogress.finish(sent) |
| 420 | | |
| 421 | | if not self._write_queue: |
| 422 | | if self._queue_close: |
| 423 | | return self.close(immediate=True) |
| 424 | | self._wmon.unregister() |
| 425 | | |
| 426 | | except (IOError, socket.error), (errno, msg): |
| 427 | | if errno == 11: |
| 428 | | # Resource temporarily unavailable -- we are trying to write |
| 429 | | # data to a socket when none is available. To prevent a busy |
| 430 | | # loop (notifier loop will keep calling us back) we sleep a |
| 431 | | # tiny bit. It's admittedly a bit kludgy, but it's a simple |
| 432 | | # solution to a condition which should not occur often. |
| 433 | | time.sleep(0.001) |
| 434 | | return |
| 435 | | |
| 436 | | # If we're here, then the socket is likely disconnected. |
| 437 | | self.close(immediate=True, expected=False) |
| 438 | | |
| 439 | | |
| 440 | | def _close(self): |
| 441 | | """ |
| 442 | | Low-level call to close the fd Can be overridden by subclasses. |
| 443 | | """ |
| 444 | | os.close(self.fileno) |
| 445 | | |
| 446 | | |
| 447 | | def close(self, immediate=False, expected=True): |
| 448 | | """ |
| 449 | | Closes the fd. If immediate is False and there is data in the |
| 450 | | write buffer, the fd is closed once the write buffer is emptied. |
| 451 | | Otherwise the fd is closed immediately and the 'closed' signal |
| 452 | | is emitted. |
| 453 | | """ |
| 454 | | if not immediate and self._write_queue: |
| 455 | | # Immediate close not requested and we have some data left |
| 456 | | # to be written, so defer close until after write queue |
| 457 | | # is empty. |
| 458 | | self._queue_close = True |
| 459 | | return |
| 460 | | |
| 461 | | if not self._rmon and not self._wmon: |
| 462 | | # already closed |
| 463 | | return |
| 464 | | |
| 465 | | if self._rmon: |
| 466 | | self._rmon.unregister() |
| 467 | | if self._wmon: |
| 468 | | self._wmon.unregister() |
| 469 | | self._rmon = None |
| 470 | | self._wmon = None |
| 471 | | self._queue_close = False |
| 472 | | |
| 473 | | # Throw IOError to any pending InProgress in the write queue |
| 474 | | for data, inprogress in self._write_queue: |
| 475 | | if len(inprogress): |
| 476 | | # Somebody cares about this InProgress, so we need to finish |
| 477 | | # it. |
| 478 | | inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None) |
| 479 | | del self._write_queue[:] |
| 480 | | |
| 481 | | self._close() |
| 482 | | self._fd = None |
| 483 | | |
| 484 | | self.signals['closed'].emit(expected) |
| 485 | | main.signals['shutdown'].disconnect(self.close) |
| 486 | | |