返回目录 Twisted中的多任务处理探索 (1)

1. Twisted中的多任务处理探索 (1)

如果开发一个服务器的系统,你必然在线程和进程中徘徊选择。因为服务器运行在一个多用户的情况之下,多个用户的请求同时发起,那么我们就要在同一时间为这些用户进行数据处理并将结果返回给用户。你必然要选择将处理分到不同的线程或进程上去做处理,否则用户就只能序列的得到回应,而没有了并发的效果。

先来看看线程,对于twisted,它视不同的情况已经为我们提供了线程使用的框架,这在IReactorThreads接口中已经定义了。但是在IReactorThreads中定义了两种线程的使用方法:callFromThreadcallInThread。我是用了很久的时间才了解清楚它们的区别及用途。

先来看看callFromThread的一个示例:

   1 from twisted.internet import reactor
   2 from twisted.python import threadable
   3 threadable.init(1)
   4 
   5 def notThreadSafe(x):
   6      """do something that isn't thread-safe"""
   7      # ...
   8 
   9 def threadSafeScheduler():
  10     """Run in thread-safe manner."""
  11     reactor.callFromThread(notThreadSafe, 3) # will run 'notThreadSafe(3)'
  12                                              # in the event loop

这是Howto中一个比较经典的示例,但是为什么这样用,以及我们在什么场景下来使用callFromThread却实又讲的非常不清晰。我就用了最简单的办法来了解:分析它的代码流。

   1 from twisted.python import threadable
   2 threadable.init(1)

的工作看起来是做了要用的线程表的初始化。它的代码中我们需要关注的代码如下:

   1 _to_be_synched = []
   2 threaded = None
   3 ioThread = None
   4 threadCallbacks = []
   5 synchronize(_ThreadedWaiter)
   6 init(0)

这里,synchronize方法的代码如下:

   1 def synchronize(*klasses):
   2     """Make all methods listed in each class' synchronized attribute synchronized.
   3 
   4     The synchronized attribute should be a list of strings, consisting of the
   5     names of methods that must be synchronized. If we are running in threaded
   6     mode these methods will be wrapped with a lock.
   7     """
   8     global _to_be_synched
   9     if not threaded:
  10         map(_to_be_synched.append, klasses)
  11         return
  12 
  13     if threaded:
  14         import hook
  15         for klass in klasses:
  16 ##            hook.addPre(klass, '__init__', _synch_init)
  17             for methodName in klass.synchronized:
  18                 hook.addPre(klass, methodName, _synchPre)
  19                 hook.addPost(klass, methodName, _synchPost)

它先通过map(_to_be_synched.append, klasses)来将传入的_ThreadedWaiter加入到_to_be_synched列表中。然后将这个类的所有方法在执行前和执行后都加入同步处理。这里的hook的代码真的是非常有意思,很像aspect(面向方面的编程)中的断言切入。

最后的init(0)所做的就很简单了:

   1 def init(with_threads=1):
   2     """Initialize threading. Should be run once, at the beginning of program.
   3     """
   4     global threaded, _to_be_synched, Waiter
   5     global threadingmodule, threadmodule, XLock, _synchLockCreator
   6     if threaded == with_threads:
   7         return
   8     elif threaded:
   9         raise RuntimeError("threads cannot be disabled, once enabled")
  10     threaded = with_threads
  11     if threaded:
  12         log.msg('Enabling Multithreading.')
  13         import thread, threading
  14         threadmodule = thread
  15         threadingmodule = threading
  16         Waiter = _ThreadedWaiter
  17         XLock = _XLock
  18         _synchLockCreator = XLock()
  19         synchronize(*_to_be_synched)
  20         _to_be_synched = []
  21         for cb in threadCallbacks:
  22             cb()
  23     else:
  24         Waiter = _Waiter
  25         # Hack to allow XLocks to be unpickled on an unthreaded system.
  26         class DummyXLock:
  27             pass
  28         XLock = DummyXLock


返回目录