返回目录


解读Twisted的reactor(2)

1. 解读Twisted的reactor(2)

Twisted框架不只是想到了跨操作系统、跨进程/线程,还考虑到了跨语言的Jython和Cython。这一点会让所有python开发人员受益。先来看看在Cython下,Twisted的reactor的初始化:

from twisted.internet import default

不只是import了一个module,它所做的事比我们想像的远要多的多呢。我们来看看import时它所做的事:

   1 from bisect import insort
   2 from time import time, sleep
   3 import os
   4 import socket
   5 import sys
   6 import warnings
   7 
   8 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorUNIX, IReactorUNIXDatagram
   9 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary
  10 from twisted.internet.interfaces import IReactorProcess, IReactorFDSet, IReactorMulticast
  11 from twisted.internet import main, error, protocol, interfaces
  12 from twisted.internet import tcp, udp, defer
  13 
  14 from twisted.python import log, threadable, failure
  15 from twisted.persisted import styles
  16 from twisted.python.runtime import platformType, platform
  17 
  18 from twisted.internet.base import ReactorBase

引用了所有的所需要使用的module

   1 try:
   2     from twisted.internet import ssl
   3     sslEnabled = True
   4 except ImportError:
   5     sslEnabled = False
   6 
   7 try:
   8     from twisted.internet import unix
   9     unixEnabled = True
  10 except ImportError:
  11     unixEnabled = False

导入ssl和unix两个module。

   1 from main import CONNECTION_LOST
   2 
   3 if platformType != 'java':
   4     import select
   5     from errno import EINTR, EBADF
   6 
   7 if platformType == 'posix':
   8     import process
   9 
  10 if platformType == "win32":
  11     try:
  12         import win32process
  13     except ImportError:
  14         win32process = None

根据操作系统的不同引入不同的线程、进程的操作库。注意,win32process等这些特别的平台的module,都需要到site-packages下去找。

   1 if platform.getType() == 'posix':
   2     _Waker = _UnixWaker
   3 elif platform.getType() == 'win32':
   4     _Waker = _Win32Waker
   5 
   6 
   7 # global state for selector
   8 reads = {}
   9 writes = {}

识别不同的操作系统,来统一一个Socket操作方法。注意,就是在这里,我们统一使用一个_Waker,但是可以使用不同操作系统的socket操作。 这是win32的初始化:

   1     def __init__(self, reactor):
   2         """Initialize.
   3         """
   4         log.msg("starting waker")
   5         self.reactor = reactor
   6         # Following select_trigger (from asyncore)'s example;
   7         server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   8         client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
   9         client.setsockopt(socket.IPPROTO_TCP, 1, 1)
  10         server.bind(('127.0.0.1', 0))
  11         server.listen(1)
  12         client.connect(server.getsockname())
  13         reader, clientaddr = server.accept()
  14         client.setblocking(1)
  15         reader.setblocking(0)
  16         self.r = reader
  17         self.w = client
  18         self.fileno = self.r.fileno

而unix的操作初始化显然简单的多:

   1     def __init__(self, reactor):
   2         """Initialize.
   3         """
   4         self.reactor = reactor
   5         i, o = os.pipe()
   6         self.i = os.fdopen(i,'r')
   7         self.o = os.fdopen(o,'w')
   8         self.fileno = self.i.fileno

原因很简单,我们完全可以针对平台的不同,使用不同的优化方法来进行socket操作。

   1 if platform.getType() == "win32":
   2     _select = win32select
   3 else:
   4     _select = select.select
   5 
   6 # Exceptions that doSelect might return frequently
   7 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
   8 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')

使用select进行socket操作显然是最经济的,它可以让你启动最少的线程或进程就进行大量的并发连接。如果你一定要构建一个超容量的服务器,这会是最经济运行的基础。当然,由于操作系统不一样,你的select的处理也可能完全不同的。

下面再来看看reactor的最后一行

default.install()

做了点什么。

   1 def install():
   2     """Configure the twisted mainloop to be run using the select() reactor.
   3     """
   4     reactor = SelectReactor()
   5     main.installReactor(reactor)

它初始化了一个SelectReactor类。初始化会先执行ReactorBase的初始化方法:

   1     def __init__(self):
   2         self._eventTriggers = {}
   3         self._pendingTimedCalls = []
   4         self.running = 0
   5         self.waker = None
   6         self.resolver = None
   7         self.usingThreads = 0
   8         self.addSystemEventTrigger('during', 'shutdown', self.crash)
   9         self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
  10         threadable.whenThreaded(self.initThreads)

然后运行PosixReactorBase的初始化方法:

   1     def __init__(self):
   2         ReactorBase.__init__(self)
   3         if self.usingThreads or platformType == "posix":
   4             self.installWaker()

我们可以看到,这部分完全是应平台的不同在初始化select时所使用的各个变量及线程或是信号处理方法。

main.installReactor(reactor)

是最重要的,它将我们初始华好的reactor正名为twisted.internet.reactor让所有的人都能使用它:

   1 def installReactor(reactor):
   2     global addReader, addWriter, removeReader, removeWriter
   3     global iterate, addTimeout, wakeUp
   4     # this stuff should be common to all reactors.
   5     import twisted.internet
   6     import sys
   7     assert not sys.modules.has_key('twisted.internet.reactor'), \
           "reactor already installed"
   8     twisted.internet.reactor = reactor
   9     sys.modules['twisted.internet.reactor'] = reactor
  10 
  11     # install stuff for backwards compatability
  12 
  13     # IReactorCore
  14     if implements(reactor, IReactorCore):
  15         iterate = reactor.iterate
  16 
  17     # IReactorFDSet
  18     if implements(reactor, IReactorFDSet):
  19         addReader = reactor.addReader
  20         addWriter = reactor.addWriter
  21         removeWriter = reactor.removeWriter
  22         removeReader = reactor.removeReader
  23 
  24     # IReactorTime
  25     if implements(reactor, IReactorTime):
  26         def addTimeout(m, t, f=reactor.callLater):
  27             warnings.warn("main.addTimeout is deprecated, use reactor.callLater instead.")
  28             f(t, m)
  29 
  30     # ???
  31     if hasattr(reactor, "wakeUp"):
  32         wakeUp = reactor.wakeUp

仔细看看这两句:

   1     twisted.internet.reactor = reactor
   2     sys.modules['twisted.internet.reactor'] = reactor

它将我们初始化好的reactor放到了原来del的位置。


返回目录