解读Twisted的reactor(2)
1. 解读Twisted的reactor(2)
Twisted框架不只是想到了跨操作系统、跨进程/线程,还考虑到了跨语言的Jython和Cython。这一点会让所有python开发人员受益。先来看看在Cython下,Twisted的reactor的初始化:
from twisted.internet import default
不只是import了一个module,它所做的事比我们想像的远要多的多呢。我们来看看import时它所做的事:
1 from bisect import insort
2 from time import time, sleep
3 import os
4 import socket
5 import sys
6 import warnings
7
8 from twisted.internet.interfaces import IReactorCore, IReactorTime, IReactorUNIX, IReactorUNIXDatagram
9 from twisted.internet.interfaces import IReactorTCP, IReactorUDP, IReactorSSL, IReactorArbitrary
10 from twisted.internet.interfaces import IReactorProcess, IReactorFDSet, IReactorMulticast
11 from twisted.internet import main, error, protocol, interfaces
12 from twisted.internet import tcp, udp, defer
13
14 from twisted.python import log, threadable, failure
15 from twisted.persisted import styles
16 from twisted.python.runtime import platformType, platform
17
18 from twisted.internet.base import ReactorBase
引用了所有的所需要使用的module
1 try:
2 from twisted.internet import ssl
3 sslEnabled = True
4 except ImportError:
5 sslEnabled = False
6
7 try:
8 from twisted.internet import unix
9 unixEnabled = True
10 except ImportError:
11 unixEnabled = False
导入ssl和unix两个module。
1 from main import CONNECTION_LOST
2
3 if platformType != 'java':
4 import select
5 from errno import EINTR, EBADF
6
7 if platformType == 'posix':
8 import process
9
10 if platformType == "win32":
11 try:
12 import win32process
13 except ImportError:
14 win32process = None
根据操作系统的不同引入不同的线程、进程的操作库。注意,win32process等这些特别的平台的module,都需要到site-packages下去找。
1 if platform.getType() == 'posix':
2 _Waker = _UnixWaker
3 elif platform.getType() == 'win32':
4 _Waker = _Win32Waker
5
6
7 # global state for selector
8 reads = {}
9 writes = {}
识别不同的操作系统,来统一一个Socket操作方法。注意,就是在这里,我们统一使用一个_Waker,但是可以使用不同操作系统的socket操作。 这是win32的初始化:
1 def __init__(self, reactor):
2 """Initialize.
3 """
4 log.msg("starting waker")
5 self.reactor = reactor
6 # Following select_trigger (from asyncore)'s example;
7 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
8 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
9 client.setsockopt(socket.IPPROTO_TCP, 1, 1)
10 server.bind(('127.0.0.1', 0))
11 server.listen(1)
12 client.connect(server.getsockname())
13 reader, clientaddr = server.accept()
14 client.setblocking(1)
15 reader.setblocking(0)
16 self.r = reader
17 self.w = client
18 self.fileno = self.r.fileno
而unix的操作初始化显然简单的多:
1 def __init__(self, reactor):
2 """Initialize.
3 """
4 self.reactor = reactor
5 i, o = os.pipe()
6 self.i = os.fdopen(i,'r')
7 self.o = os.fdopen(o,'w')
8 self.fileno = self.i.fileno
原因很简单,我们完全可以针对平台的不同,使用不同的优化方法来进行socket操作。
1 if platform.getType() == "win32":
2 _select = win32select
3 else:
4 _select = select.select
5
6 # Exceptions that doSelect might return frequently
7 _NO_FILENO = error.ConnectionFdescWentAway('Handler has no fileno method')
8 _NO_FILEDESC = error.ConnectionFdescWentAway('Filedescriptor went away')
使用select进行socket操作显然是最经济的,它可以让你启动最少的线程或进程就进行大量的并发连接。如果你一定要构建一个超容量的服务器,这会是最经济运行的基础。当然,由于操作系统不一样,你的select的处理也可能完全不同的。
下面再来看看reactor的最后一行
default.install()
做了点什么。
1 def install():
2 """Configure the twisted mainloop to be run using the select() reactor.
3 """
4 reactor = SelectReactor()
5 main.installReactor(reactor)
它初始化了一个SelectReactor类。初始化会先执行ReactorBase的初始化方法:
1 def __init__(self):
2 self._eventTriggers = {}
3 self._pendingTimedCalls = []
4 self.running = 0
5 self.waker = None
6 self.resolver = None
7 self.usingThreads = 0
8 self.addSystemEventTrigger('during', 'shutdown', self.crash)
9 self.addSystemEventTrigger('during', 'shutdown', self.disconnectAll)
10 threadable.whenThreaded(self.initThreads)
然后运行PosixReactorBase的初始化方法:
1 def __init__(self):
2 ReactorBase.__init__(self)
3 if self.usingThreads or platformType == "posix":
4 self.installWaker()
我们可以看到,这部分完全是应平台的不同在初始化select时所使用的各个变量及线程或是信号处理方法。
main.installReactor(reactor)
是最重要的,它将我们初始华好的reactor正名为twisted.internet.reactor让所有的人都能使用它:
1 def installReactor(reactor):
2 global addReader, addWriter, removeReader, removeWriter
3 global iterate, addTimeout, wakeUp
4 # this stuff should be common to all reactors.
5 import twisted.internet
6 import sys
7 assert not sys.modules.has_key('twisted.internet.reactor'), \
"reactor already installed"
8 twisted.internet.reactor = reactor
9 sys.modules['twisted.internet.reactor'] = reactor
10
11 # install stuff for backwards compatability
12
13 # IReactorCore
14 if implements(reactor, IReactorCore):
15 iterate = reactor.iterate
16
17 # IReactorFDSet
18 if implements(reactor, IReactorFDSet):
19 addReader = reactor.addReader
20 addWriter = reactor.addWriter
21 removeWriter = reactor.removeWriter
22 removeReader = reactor.removeReader
23
24 # IReactorTime
25 if implements(reactor, IReactorTime):
26 def addTimeout(m, t, f=reactor.callLater):
27 warnings.warn("main.addTimeout is deprecated, use reactor.callLater instead.")
28 f(t, m)
29
30 # ???
31 if hasattr(reactor, "wakeUp"):
32 wakeUp = reactor.wakeUp
仔细看看这两句:
1 twisted.internet.reactor = reactor
2 sys.modules['twisted.internet.reactor'] = reactor
它将我们初始化好的reactor放到了原来del的位置。