| 141 | | self._rmon = self._wmon = None |
| | 139 | self._rmon = None |
| | 140 | self._wmon = None |
| | 141 | |
| | 142 | |
| | 143 | @property |
| | 144 | def alive(self): |
| | 145 | """ |
| | 146 | Returns True if the fd exists and is open. |
| | 147 | """ |
| | 148 | # If the fd is closed, self._fd will be None. |
| | 149 | return self._fd != None |
| | 150 | |
| | 151 | |
| | 152 | @property |
| | 153 | def fileno(self): |
| | 154 | """ |
| | 155 | Returns the integer for this file descriptor, or None if no descriptor |
| | 156 | has been set. |
| | 157 | """ |
| | 158 | try: |
| | 159 | return self._fd.fileno() |
| | 160 | except AttributeError: |
| | 161 | return self._fd |
| | 162 | |
| | 163 | |
| | 164 | @property |
| | 165 | def chunk_size(self): |
| | 166 | """ |
| | 167 | Number of bytes to attempt to read from the fd at a time. The |
| | 168 | default is 1M. A 'read' signal is emitted for each chunk read from the |
| | 169 | fd. (The number of bytes read at a time may be less than the chunk |
| | 170 | size, but will never be more.) |
| | 171 | """ |
| | 172 | return self._chunk_size |
| | 173 | |
| | 174 | |
| | 175 | @chunk_size.setter |
| | 176 | def chunk_size(self, size): |
| | 177 | self._chunk_size = size |
| | 178 | |
| | 179 | |
| | 180 | # TODO: settable write_queue_max property |
| | 181 | |
| | 182 | @property |
| | 183 | def write_queue_size(self): |
| | 184 | """ |
| | 185 | Returns the number of bytes queued in memory to be written to the |
| | 186 | descriptor. |
| | 187 | """ |
| | 188 | # XXX: this is not terribly efficient when the write queue has |
| | 189 | # many elements. We may decide to keep a separate counter. |
| | 190 | return sum(len(data) for data, inprogress in self._write_queue) |
| | 191 | |
| | 192 | |
| | 193 | def _is_read_connected(self): |
| | 194 | """ |
| | 195 | Returns True if we're interested in read events. |
| | 196 | """ |
| | 197 | return not len(self._read_signal) == len(self._readline_signal) == \ |
| | 198 | len(self.signals['read']) == len(self.signals['readline']) == 0 |
| | 199 | |
| | 200 | |
| | 201 | def _update_read_monitor(self, signal=None, change=None): |
| | 202 | """ |
| | 203 | Update read IOMonitor to register or unregister based on if there are |
| | 204 | any handlers attached to the read signals. If there are no handlers, |
| | 205 | there is no point in reading data from the descriptor since it will go |
| | 206 | nowhere. This also allows us to push back the read buffer to the OS. |
| | 207 | |
| | 208 | We must call this immediately after reading a block, and not defer |
| | 209 | it until the end of the mainloop iteration via a timer in order not |
| | 210 | to lose incoming data between read() calls. |
| | 211 | """ |
| | 212 | if not (self._mode & IO_READ) or not self._rmon or change == Signal.SIGNAL_DISCONNECTED: |
| | 213 | return |
| | 214 | elif not self._is_read_connected(): |
| | 215 | self._rmon.unregister() |
| | 216 | elif not self._rmon.active(): |
| | 217 | self._rmon.register(self.fileno, IO_READ) |
| | 218 | |
| | 219 | |
| | 220 | def _set_non_blocking(self): |
| | 221 | """ |
| | 222 | Low-level call to set the fd non-blocking. Can be overridden by |
| | 223 | subclasses. |
| | 224 | """ |
| | 225 | flags = fcntl.fcntl(self.fileno, fcntl.F_GETFL) |
| | 226 | fcntl.fcntl(self.fileno, fcntl.F_SETFL, flags | os.O_NONBLOCK) |
| | 227 | |
| | 228 | |
| | 229 | def wrap(self, fd, mode): |
| | 230 | """ |
| | 231 | Wraps an existing file descriptor. |
| | 232 | """ |
| | 233 | self._fd = fd |
| | 234 | self._mode = mode |
| | 235 | if not fd: |
| | 236 | return |
| | 237 | self._set_non_blocking() |
| | 238 | |
| | 239 | if self._mode & IO_READ: |
| | 240 | if self._rmon: |
| | 241 | self._rmon.unregister() |
| | 242 | self._rmon = IOMonitor(self._handle_read) |
| | 243 | self._update_read_monitor() |
| | 244 | |
| | 245 | if self._mode & IO_WRITE: |
| | 246 | if self._wmon: |
| | 247 | self._wmon.unregister() |
| | 248 | self._wmon = IOMonitor(self._handle_write) |
| | 249 | if self._write_queue: |
| | 250 | self._wmon.register(self.fileno, IO_WRITE) |
| | 251 | |
| | 252 | # Disconnect socket and remove socket file (if unix socket) on shutdown |
| | 253 | main.signals['shutdown'].connect_weak(self.close) |
| | 254 | |
| | 255 | |
| | 256 | def _is_readable(self): |
| | 257 | """ |
| | 258 | Low-level call to read from fd. Can be overridden by subclasses. |
| | 259 | """ |
| | 260 | return self._fd != None |
| | 261 | |
| | 262 | |
| | 263 | def _async_read(self, signal): |
| | 264 | """ |
| | 265 | Common implementation for read() and readline(). |
| | 266 | """ |
| | 267 | if not (self._mode & IO_READ): |
| | 268 | raise IOError(9, 'Cannot read on a write-only descriptor') |
| | 269 | if not self._is_readable(): |
| | 270 | # fd is not readable. Return an InProgress pre-finished |
| | 271 | # with None |
| | 272 | return InProgress().finish(None) |
| | 273 | |
| | 274 | return inprogress(signal) |
| | 275 | |
| | 276 | |
| | 277 | def read(self): |
| | 278 | """ |
| | 279 | Reads a chunk of data from the fd. This function returns an |
| | 280 | InProgress object. If the InProgress is finished with None, it |
| | 281 | means that no data was collected and the fd closed. |
| | 282 | |
| | 283 | It is therefore possible to busy-loop by reading on a closed |
| | 284 | fd:: |
| | 285 | |
| | 286 | while True: |
| | 287 | fd.read().wait() |
| | 288 | |
| | 289 | So the return value of read() should be checked. Alternatively, |
| | 290 | fd.alive could be tested:: |
| | 291 | |
| | 292 | while fd.alive: |
| | 293 | fd.read().wait() |
| | 294 | |
| | 295 | """ |
| | 296 | return self._async_read(self._read_signal) |
| | 297 | |
| | 298 | |
| | 299 | def readline(self): |
| | 300 | """ |
| | 301 | Reads a line from the fd (with newline stripped). The function |
| | 302 | returns an InProgress object. If the InProgress is finished with None |
| | 303 | or the empty string, it means that no data was collected and the socket |
| | 304 | closed. |
| | 305 | """ |
| | 306 | return self._async_read(self._readline_signal) |
| | 307 | |
| | 308 | |
| | 309 | def _read(self, size): |
| | 310 | """ |
| | 311 | Low-level call to read from fd. Can be overridden by subclasses. |
| | 312 | """ |
| | 313 | return os.read(self.fileno, size) |
| | 314 | |
| | 315 | |
| | 316 | def _handle_read(self): |
| | 317 | """ |
| | 318 | IOMonitor callback when there is data to be read from the fd. |
| | 319 | |
| | 320 | This callback is only registered when we know the user is interested in |
| | 321 | reading data (by connecting to the read or readline signals, or calling |
| | 322 | read() or readline()). This is necessary for flow control. |
| | 323 | """ |
| | 324 | try: |
| | 325 | data = self._read(self._chunk_size) |
| | 326 | except (IOError, socket.error), (errno, msg): |
| | 327 | if errno == 11: |
| | 328 | # Resource temporarily unavailable -- we are trying to read |
| | 329 | # data on a socket when none is available. |
| | 330 | return |
| | 331 | # If we're here, then the socket is likely disconnected. |
| | 332 | data = None |
| | 333 | except: |
| | 334 | log.exception('%s._handle_read failed with unknown exception, closing socket', self.__class__.__name__) |
| | 335 | data = None |
| | 336 | |
| | 337 | # _read_signal is for InProgress objects waiting on the next read(). |
| | 338 | # For these we must emit even when data is None. |
| | 339 | self._read_signal.emit(data) |
| | 340 | |
| | 341 | if not data: |
| | 342 | self._readline_signal.emit(data) |
| | 343 | return self.close(immediate=True, expected=False) |
| | 344 | |
| | 345 | self.signals['read'].emit(data) |
| | 346 | # Update read monitor if necessary. If there are no longer any |
| | 347 | # callbacks left on any of the read signals (most likely _read_signal |
| | 348 | # or _readline_signal), we want to prevent _handle_read() from being |
| | 349 | # called, otherwise next time read() or readline() is called, we will |
| | 350 | # have lost that data. |
| | 351 | self._update_read_monitor() |
| | 352 | |
| | 353 | |
| | 354 | # TODO: parse input into separate lines and emit readline. |
| | 355 | |
| | 356 | |
| | 357 | def _write(self, data): |
| | 358 | """ |
| | 359 | Low-level call to write to the fd Can be overridden by subclasses. |
| | 360 | """ |
| | 361 | return os.write(self.fileno, data) |
| | 362 | |
| | 363 | |
| | 364 | def write(self, data): |
| | 365 | """ |
| | 366 | Writes the given data to the fd. This method returns an InProgress |
| | 367 | object which is finished when the given data is fully written to the |
| | 368 | file descriptor. |
| | 369 | |
| | 370 | It is not required that the descriptor be open in order to write to it. |
| | 371 | Written data is queued until the descriptor open and then flushed. As |
| | 372 | writes are asynchronous, all written data is queued. It is the |
| | 373 | caller's responsibility to ensure the internal write queue does not |
| | 374 | exceed the desired size by waiting for past write() InProgress to |
| | 375 | finish before writing more data. |
| | 376 | |
| | 377 | If a write does not complete because the file descriptor was closed |
| | 378 | prematurely, an IOError is thrown to the InProgress. |
| | 379 | """ |
| | 380 | if not (self._mode & IO_WRITE): |
| | 381 | raise IOError(9, 'Cannot write to a read-only descriptor') |
| | 382 | |
| | 383 | inprogress = InProgress() |
| | 384 | if data: |
| | 385 | self._write_queue.append((data, inprogress)) |
| | 386 | if self._fd and self._wmon and not self._wmon.active(): |
| | 387 | self._wmon.register(self.fileno, IO_WRITE) |
| | 388 | else: |
| | 389 | # We're writing the null string, nothing really to do. We're |
| | 390 | # implicitly done. |
| | 391 | inprogress.finish(0) |
| | 392 | return inprogress |
| | 393 | |
| | 394 | |
| | 395 | def _handle_write(self): |
| | 396 | """ |
| | 397 | IOMonitor callback when the fd is writable. This callback is not |
| | 398 | registered then the write queue is empty, so we only get called when |
| | 399 | there is something to write. |
| | 400 | """ |
| | 401 | if not self._write_queue: |
| | 402 | # Shouldn't happen; sanity check. |
| | 403 | return |
| | 404 | |
| | 405 | try: |
| | 406 | while self._write_queue: |
| | 407 | data, inprogress = self._write_queue.pop(0) |
| | 408 | sent = self._write(data) |
| | 409 | if sent != len(data): |
| | 410 | # Not all data was able to be sent; push remaining data |
| | 411 | # back onto the write buffer. |
| | 412 | self._write_queue.insert(0, (data[sent:], inprogress)) |
| | 413 | break |
| | 414 | else: |
| | 415 | # All data is written, finish the InProgress associated |
| | 416 | # with this write. |
| | 417 | inprogress.finish(sent) |
| | 418 | |
| | 419 | if not self._write_queue: |
| | 420 | if self._queue_close: |
| | 421 | return self.close(immediate=True) |
| | 422 | self._wmon.unregister() |
| | 423 | |
| | 424 | except (IOError, socket.error), (errno, msg): |
| | 425 | if errno == 11: |
| | 426 | # Resource temporarily unavailable -- we are trying to write |
| | 427 | # data to a socket when none is available. To prevent a busy |
| | 428 | # loop (notifier loop will keep calling us back) we sleep a |
| | 429 | # tiny bit. It's admittedly a bit kludgy, but it's a simple |
| | 430 | # solution to a condition which should not occur often. |
| | 431 | time.sleep(0.001) |
| | 432 | return |
| | 433 | |
| | 434 | # If we're here, then the socket is likely disconnected. |
| | 435 | self.close(immediate=True, expected=False) |
| | 436 | |
| | 437 | |
| | 438 | def _close(self): |
| | 439 | """ |
| | 440 | Low-level call to close the fd Can be overridden by subclasses. |
| | 441 | """ |
| | 442 | os.close(self.fileno) |
| | 443 | |
| | 444 | |
| | 445 | def close(self, immediate=False, expected=True): |
| | 446 | """ |
| | 447 | Closes the fd. If immediate is False and there is data in the |
| | 448 | write buffer, the fd is closed once the write buffer is emptied. |
| | 449 | Otherwise the fd is closed immediately and the 'closed' signal |
| | 450 | is emitted. |
| | 451 | """ |
| | 452 | if not immediate and self._write_queue: |
| | 453 | # Immediate close not requested and we have some data left |
| | 454 | # to be written, so defer close until after write queue |
| | 455 | # is empty. |
| | 456 | self._queue_close = True |
| | 457 | return |
| | 458 | |
| | 459 | if not self._rmon and not self._wmon: |
| | 460 | # already closed |
| | 461 | return |
| | 462 | |
| | 463 | if self._rmon: |
| | 464 | self._rmon.unregister() |
| | 465 | if self._wmon: |
| | 466 | self._wmon.unregister() |
| | 467 | self._rmon = None |
| | 468 | self._wmon = None |
| | 469 | self._queue_close = False |
| | 470 | |
| | 471 | # Throw IOError to any pending InProgress in the write queue |
| | 472 | for data, inprogress in self._write_queue: |
| | 473 | if len(inprogress): |
| | 474 | # Somebody cares about this InProgress, so we need to finish |
| | 475 | # it. |
| | 476 | inprogress.throw(IOError, IOError(9, "Descriptor closed prematurely"), None) |
| | 477 | del self._write_queue[:] |
| | 478 | |
| | 479 | self._close() |
| | 480 | self._fd = None |
| | 481 | |
| | 482 | self.signals['closed'].emit(expected) |
| | 483 | main.signals['shutdown'].disconnect(self.close) |
| | 484 | |
| | 485 | |
| | 486 | class SocketError(Exception): |
| | 487 | pass |
| | 488 | |
| | 489 | class Socket(IODescriptor): |
| | 490 | """ |
| | 491 | Notifier-aware socket class, implementing fully asynchronous reads |
| | 492 | and writes. |
| | 493 | """ |
| | 494 | |
| | 495 | def __init__(self, buffer_size=None, chunk_size=1024*1024): |
| | 496 | self._connecting = False |
| | 497 | self._addr = None |
| | 498 | self._listening = False |
| | 499 | self._buffer_size = buffer_size |
| | 500 | |
| | 501 | super(Socket, self).__init__(chunk_size=chunk_size) |
| | 502 | self.signals += ('new-client',) |
| | 503 | |