使用进程(Using Processes)

1. 概述(Overview)

Along with connection to servers across the internet, Twisted also connects to local processes with much the same API. the API is described in more detail in the documentation of:


2. 运行另一个进程(Running Another Processes)

Processes are run through the reactor, using reactor.spawnProcess(). Pipes are created to the child process, and added to the reactor core so that the application will not block while sending data into or pulling data out of the new process. reactor.spawnProcess() requires two arguments, processProtocol and executable, and optionally takes six more: arguments, environment, path, userID, groupID, and usePTY.

进程通过反应器(reactor)的成员函数reactor.spawnProcess()运行. 同时创建一个通向子进程的管道(pipe),由于管道是被加到反应器核心的,所以应用程序不会被阻塞. 在向新进程发送数据或者从新进程接收数据的时候, reactor.spawnProcess()需要两个参数, processProtocol和excutable,还有其它留个可选参数: arguments,environment,path,userID,groupID和usePTY.

   1 from twisted.internet import reactor
   3 mypp = MyProcessProtocol()
   4 reactor.spawnProcess(processProtocol, executable, args=[program, arg1, arg2],
   5                      env={'HOME': os.environ['HOME']}, path,
   6                      uid, gid, usePTY, childFDs)

3. 写一个进程协议(Writing a ProcessProtocol)

The ProcessProtocol you pass to spawnProcess is your interaction with the process. It has a very similar signature to a regular Protocol, but it has several extra methods to deal with events specific to a process. In our example, we will interface with 'wc' to create a word count of user-given text. First, we'll start by importing the required modules, and writing the initialization for our ProcessProtocol.


   1 from twisted.internet import protocol
   2 class WCProcessProtocol(protocol.ProcessProtocol):
   4     def __init__(self, text):
   5         self.text = text

When the ProcessProtocol is connected to the protocol, it has the connectionMade method called. In our protocol, we will write our text to the standard input of our process and then close standard input, to the let the process know we are done writing to it.


   1 def connectionMade(self):
   2         self.transport.write(self.text)
   3         self.transport.closeStdin()


   1 def outReceived(self, data):
   2         fieldLength = len(data) / 3
   3         lines = int(data[:fieldLength])
   4         words = int(data[fieldLength:fieldLength*2])
   5         chars = int(data[fieldLength*2:])
   6         self.transport.loseConnection()
   7         self.receiveCounts(lines, words, chars)

Now, the process has parsed the output, and ended the connection to the process. Then it sends the results on to the final method, receiveCounts. This is for users of the class to override, so as to do other things with the data. For our demonstration, we will just print the results.


   1 def receiveCounts(self, lines, words, chars):
   2         print 'Received counts from wc.'
   3         print 'Lines:', lines
   4         print 'Words:', words
   5         print 'Characters:', chars

We're done! To use our WCProcessProtocol, we create an instance, and pass it to spawnProcess.


   1 from twisted.internet import reactor
   2 wcProcess = WCProcessProtocol("accessing protocols through Twisted is fun!\n")
   3 reactor.spawnProcess(wcProcess, 'wc', ['wc'])
   4 reactor.run()

4. 进程协议可以做什么(Things that can happen to your ProcessProtocol)

These are the methods that you can usefully override in your subclass of ProcessProtocol:


  • .connectionMade: This is called when the program is started, and makes a good place to write data into the stdin pipe (using self.transport.write()).
  • .outReceived(data): This is called with data that was received from the process' stdout pipe. Pipes tend to provide data in larger chunks than sockets (one kilobyte is a common buffer size), so you may not experience the random dribs and drabs behavior typical of network sockets, but regardless you should be prepared to deal if you don't get all your data in a single call. To do it properly, outReceived ought to simply accumulate the data and put off doing anything with it until the process has finished.
  • .errReceived(data): This is called with data from the process' stderr pipe. It behaves just like outReceived.
  • .inConnectionLost: This is called when the reactor notices that the process' stdin pipe has closed. Programs don't typically close their own stdin, so this will probably get called when your ProcessProtocol has shut down the write side with self.transport.loseConnection().

  • .outConnectionLost: This is called when the program closes its stdout pipe. This usually happens when the program terminates.
  • .errConnectionLost: Same as outConnectionLost, but for stderr instead of stdout.
  • .processEnded(status): This is called when the child process has been reaped, and receives information about the process' exit status. The status is passed in the form of a Failure instance, created with a .value that either holds a Failure object if the process terminated normally (it died of natural causes instead of receiving a signal, and if the exit code was 0), or a ProcessTerminatedobject (with an .exitCode attribute) if something went wrong. This scheme may seem a bit weird, but I trust that it proves useful when dealing with exceptions that occur in asynchronous code.

This will always be called afterinConnectionLost, outConnectionLost, and errConnectionLost are called.

  • .connectionMade: 程序开始的时候会调用这这个函数,这里是写数据到标准输入管道的合适时机(使用self.transport.write()).
  • .outReceived(data): 进程在标准输出管道收到数据的时候会调用这个函数.管道趋向于处理比套接字数据量大很多的数据(千字节的buffer是很普通的).也许你没有从套接字获取零星数据的经验,但是如果不知道自己在干什么就不要一次就从管道中取出所有数据.合适的方式是,ouReceived只是简单的收集数据,在程序结束之前处理它们.
  • .errReceived(data): 进程在标准错误管道收到数据的时候会调用这个函数.它的行为和.outReceived(data)类似
  • .inConnnectionLost: 当reactor发现进程的标准输入管道被关闭的时候这个函数会被调用,通常一个程序不会关闭它自己的标准输入,因此这个函数一般会在你的ProcessProtocol调用self.transport.loseConnection()关闭写入端的时候被调用.

  • .outConnectionLost: 这个函数通常在程序关闭他自己的标准输出管道的时候被调用.这通常发生在程序结束的时候.
  • .errConnectinoLost: 标准错误输出管道,其它同.outConnectionLost.
  • .processEnded(status): 当子进程完程的时候会被调用,收到关于进程退出的信息.状态以Failure实例的方式传回.它的成员.value有两种情况,如果进程是正常结束(自然结束而不是因为收到一个信号,并且推出码是0), .value就是个ProcessDone对象,如果有什么地方出错了, .value就是个ProcessTerminated对象(有一个.exitCode属性).这样的安排也许看起来有些怪异,不过我相信对于处理异步代码的异常是非常有用的.

    • 这个函数总会在 inConnectionLost, outConnectionLost 和 errConnectionLost 之后被调用.

The base-class definitions of these functions are all no-ops. This will result in all stdout and stderr being thrown away. Note that it is important for data you don't care about to be thrown away: if the pipe were not read, the child process would eventually block as it tried to write to a full pipe.

基类中这些函数的定义都是空操作(no-ops).这样会导致所有的标准输出和标准错误都被丢弃.注意丢弃你不关心的数据是很重要的: 如果关掉不能读,子进程最终会被阻塞的,因为它可能会试图写一个已经满的管道.

5. 可以对进程做什么(Things you can do from your ProcessProtocol)

The following are the basic ways to control the child process:


  • self.transport.write(data): Stuff some data in the stdin pipe. Note that this write method will queue any data that can't be written immediately. Writing will resume in the future when the pipe becomes writable again.
  • self.transport.closeStdin: Close the stdin pipe. Programs which act as filters (reading from stdin, modifying the data, writing to stdout) usually take this as a sign that they should finish their job and terminate. For these programs, it is important to close stdin when you're done with it, otherwise the child process will never quit.
  • self.transport.closeStdout: Not usually called, since you're putting the process into a state where any attempt to write to stdout will cause a SIGPIPE error. This isn't a nice thing to do to the poor process.
  • self.transport.closeStderr: Not usually called, same reason as closeStdout.
  • self.transport.loseConnection: Close all three pipes.
  • os.kill(self.transport.pid, signal.SIGKILL): Kill the child process. This will eventually result in processEnded being called.
  • self.transport.write(data): 往标准输入管道里面塞数据. 这个write操作如果发现不能立即写入的话就会把数据放入队列,等待管道再次可用的时候再写进去
  • self.transport.closeStdin: 关闭标准输入管道. 扮演过滤器(从标准输入读取数据,修改数据,写到标准输出)角色的程序通常使用这种方式来表示它完成了它的所有工作,要结束了.对于这些程序来说这样做很重要,不然子进程永远不会推出.
  • self.transport.closeStdout: 通常不会被调用,因为这样做会导致进程进入一个如果尝试向标准输出写数据就会引发SIGPIPE错误的状态.这对于可怜的进程来说可不是什么好事.
  • self.transport.closeStderr: 通常不会被调用,原因如上.
  • self.transport.loseConnection: 关闭所有的三种管道
  • os.kill(self.transport.pid, signal.SIGKILL): 杀掉子进程,这么做会导致processEnded()会被调用.

6. Verbose例子(Verbose Example)

Here is an example that is rather verbose about exactly when all the methods are called. It writes a number of lines into the wc program and then parses the output.


   1 #! /usr/bin/python
   3 from twisted.internet import protocol
   4 from twisted.internet import reactor
   5 import re
   7 class MyPP(protocol.ProcessProtocol):
   8     def __init__(self, verses):
   9         self.verses = verses
  10         self.data = ""
  11     def connectionMade(self):
  12         print "connectionMade!"
  13         for i in range(self.verses):
  14             self.transport.write("Aleph-null bottles of beer on the wall,\n" +
  15                                  "Aleph-null bottles of beer,\n" +
  16                                  "Take one down and pass it around,\n" +
  17                                  "Aleph-null bottles of beer on the wall.\n")
  18             self.transport.closeStdin() # tell them we're done
  19     def outReceived(self, data):
  20         print "outReceived! with %d bytes!" % len(data)
  21         self.data = self.data + data
  22     def errReceived(self, data):
  23         print "errReceived! with %d bytes!" % len(data)
  24     def inConnectionLost(self):
  25         print "inConnectionLost! stdin is closed! (we probably did it)"
  26     def outConnectionLost(self):
  27         print "outConnectionLost! The child closed their stdout!"
  28         # now is the time to examine what they wrote
  29         #print "I saw them write:", self.data
  30         (dummy, lines, words, chars, file) = re.split(r'\s+', self.data)
  31         print "I saw %s lines" % lines
  32     def errConnectionLost(self):
  33         print "errConnectionLost! The child closed their stderr."
  34     def processEnded(self, status_object):
  35         print "processEnded, status %d" % status_object.value.exitCode
  36         print "quitting"
  37         reactor.stop()
  39 pp = MyPP(10)
  40 reactor.spawnProcess(pp, "wc", ["wc"], {})
  41 reactor.run()

The exact output of this program depends upon the relative timing of some un-synchronized events. In particular, the program may observe the child process close its stderr pipe before or after it reads data from the stdout pipe. One possible transcript would look like this:


% ./process.py 
inConnectionLost! stdin is closed! (we probably did it)
errConnectionLost! The child closed their stderr.
outReceived! with 24 bytes!
outConnectionLost! The child closed their stdout!
I saw 40 lines
processEnded, status 0
Main loop terminated.

7. 容易一些(Doing ti the Easy Way)

Frequently, one just needs a simple way to get all the output from a program. In the blocking world, you might use commands.getoutput from the standard library, but using that in an event-driven program will cause everything else to stall until the command finishes. (in addition, the SIGCHLD handler used by that function does not play well with Twisted's own signal handling). For these cases, the twisted.internet.utils.getProcessOutput function can be used. Here is a simple example:


   1 from twisted.internet import protocol, utils, reactor
   2 from twisted.python import failure
   3 from cStringIO import StringIO
   5 class FortuneQuoter(protocol.Protocol):
   7     fortune = '/usr/games/fortune'
   9     def connectionMade(self):
  10         output = utils.getProcessOutput(self.fortune)
  11         output.addCallbacks(self.writeResponse, self.noResponse)
  13     def writeResponse(self, resp):
  14         self.transport.write(resp)
  15         self.transport.loseConnection()
  17     def noResponse(self, err):
  18         self.transport.loseConnection()
  21 if __name__ == '__main__':
  22     f = protocol.Factory()
  23     f.protocol = FortuneQuoter
  24     reactor.listenTCP(10999, f)
  25     reactor.run()

If you only need the final exit code (like commands.getstatusoutput(cmd)[0]), the twisted.internet.utils.getProcessValue function is useful. Here is an example:


   1 from twisted.internet import utils, reactor
   3 def printTrueValue(val):
   4     print "/bin/true exits with rc=%d" % val
   5     output = utils.getProcessValue('/bin/false')
   6     output.addCallback(printFalseValue)
   8 def printFalseValue(val):
   9     print "/bin/false exits with rc=%d" % val
  10     reactor.stop()
  12 output = utils.getProcessValue('/bin/true')
  13 output.addCallback(printTrueValue)
  14 reactor.run()

7.1. 加入文件描述符的(ProcessProtocols with extra file descriptors)

When you provide a childFDs dictionary with more than the normal three fds, you need addtional methods to access those pipes. These methods are more generalized than the .outReceived ones described above. In fact, those methods (outReceived and errReceived) are actually just wrappers left in for compatibility with older code, written before this generalized fd mapping was implemented. The new list of things that can happen to your ProcessProtocol is as follows:


  • .connectionMade: This is called when the program is started.
  • .childDataReceived(childFD, data): This is called with data that was received from one of the process' output pipes (i.e. where the childFDs value was r. The actual file number (from the point of view of the child process) is in childFD. For compatibility, the default implementation of .dataReceived dispatches to .outReceived or .errReceived when childFD is 1 or 2.
  • .childConnectionLost(childFD): This is called when the reactor notices that one of the process' pipes has been closed. This either means you have just closed down the parent's end of the pipe (with .transport.closeChildFD), the child closed the pipe explicitly (sometimes to indicate EOF), or the child process has terminated and the kernel has closed all of its pipes. The childFD argument tells you which pipe was closed. Note that you can only find out about file descriptors which were mapped to pipes: when they are mapped to existing fds the parent has no way to notice when they've been closed. For compatibility, the default implementation dispatches to .inConnectionLost, .outConnectionLost, or .errConnectionLost.
  • .processEnded(status): This is called when the child process has been reaped, and all pipes have been closed. This insures that all data written by the child prior to its death will be received before .processEnded is invoked.
  • .connectionMade: 程序开始的时候被调用
  • .childDataReceived(childFD, data): 数据从进程的一个输出管道(就是childFDs的值为"r")上到达的时候被调用.真正的文件描述符(从子进程的观点来看)在childFD中.考虑到兼容性, .dataReceived()的缺省实现是当childFD为1或者2的时候就分发数据到.outReceived()或者.errReceived()
  • .childConnectionLost(childFD): 当reactor发现进程的一个管道被关闭的时候被调用.这意味着或者是你在父进程端关闭了管道(用.transport.closeChildFD()),或者是子进程显式的关闭了管道(用EOF标志),或者是子进程已经结束核心关闭了所有它的管道.参数childFD告诉你哪一个管道被关闭了.要注意的是你只能发现已经映射到管道上的文件安描述符的关闭消息,当它们映射到一句能够从在的fd的时候父进程是没有办法知道它们被关闭的.同样为了兼容性,缺省实现会分发事件到.inConnectinoLost() .outConnectLost() 或者是 .errConnectionLost()
  • .processEnded(status): 当子进程完成任务,所有管道都被关闭的时候被调用.这样可以确保所有在之前写入的数据都可以在.processEnded()调用之前收到.

In addition to those methods, there are other methods available to influence the child process:


  • self.transport.writeToChild(childFD, data): Stuff some data into an input pipe. .write simply writes to childFD=0.
  • self.transport.closeChildFD(childFD): Close one of the child's pipes. Closing an input pipe is a common way to indicate EOF to the child process. Closing an output pipe is neither very friendly nor very useful.
  • os.kill(self.transport.pid, signal.SIGKILL): Kill the child process. This will eventually result in processEnded being called.
  • self.tranport.writeToChild(childFD, data): 往输入管道里面写数据. .write()只是写到childFD=0的管道
  • self.transport.closeChildFD(childFD): 关闭子进程的一个管道.关闭输入管道是发送EOF给子进程的一种常用方法.而关闭输出管道则不怎么有用也不太友好.
  • os.kill(self.transport.pid, signal.SIGKILL): 杀掉子进程.会导致processEnded()被调用.

7.2. 例子

GnuPG, the encryption program, can use additional file descriptors to accept a passphrase and emit status output. These are distinct from stdin (used to accept the crypttext), stdout (used to emit the plaintext), and stderr (used to emit human-readable status/warning messages). The passphrase FD reads until the pipe is closed and uses the resulting string to unlock the secret key that performs the actual decryption. The status FD emits machine-parseable status messages to indicate the validity of the signature, which key the message was encrypted to, etc.


gpg accepts command-line arguments to specify what these fds are, and then assumes that they have been opened by the parent before the gpg process is started. It simply performs reads and writes to these fd numbers.


To invoke gpg in decryption/verification mode, you would do something like the following:


   1 class GPGProtocol(ProcessProtocol):
   2     def __init__(self, crypttext):
   3         self.crypttext = crypttext
   4         self.plaintext = ""
   5         self.status = ""
   6     def connectionMade(self):
   7         self.transport.writeToChild(3, self.passphrase)
   8         self.transport.closeChildFD(3)
   9         self.transport.writeToChild(0, self.crypttext)
  10         self.transport.closeChildFD(0)
  11     def childDataReceived(self, childFD, data):
  12         if childFD == 1: self.plaintext += data
  13         if childFD == 4: self.status += data
  14     def processEnded(self, status):
  15         rc = status.value.exitCode
  16         if rc == 0:
  17             self.deferred.callback(self)
  18         else:
  19             self.deferred.errback(rc)
  21 def decrypt(crypttext):
  22     gp = GPGProtocol(crypttext)
  23     gp.deferred = Deferred()
  24     cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4",
  25            "--batch"]
  26     p = reactor.spawnProcess(gp, cmd[0], cmd, env=None,
  27                              childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"})
  28     return gp.deferred

In this example, the status output could be parsed after the fact. It could, of course, be parsed on the fly, as it is a simple line-oriented protocol. Methods from LineReceiver could be mixed in to make this parsing more convenient.

在这个例子中,状态输出可以被解析,它当然可以在任何时候解析(这个不会译原文on the fly),它只是个面向行的协议.LineReciver()方法调用的其它方法使的解析过程非常方便.

The stderr mapping (2:2) used will cause any GPG errors to be emitted by the parent program, just as if those errors had caused in the parent itself. This is sometimes desireable (it roughly corresponds to letting exceptions propagate upwards), especially if you do not expect to encounter errors in the child process and want them to be more visible to the end user. The alternative is to map stderr to a read-pipe and handle any such output from within the ProcessProtocol (roughly corresponding to catching the exception locally).


