使用进程(Using Processes)

-- Jerry Marx 于 [2004-08-18 18:28:47] 最后编辑

1. 概述(Overview)

Along with connection to servers across the internet, Twisted also connects to local processes with much the same API. the API is described in more detail in the documentation of:

就像通过网络连接到服务器上一样,Twisted也可以使用相同的API连接到本地进程.这些API的详细文档如下:

2. 运行另一个进程(Running Another Processes)

Processes are run through the reactor, using reactor.spawnProcess(). Pipes are created to the child process, and added to the reactor core so that the application will not block while sending data into or pulling data out of the new process. reactor.spawnProcess() requires two arguments, processProtocol and executable, and optionally takes six more: arguments, environment, path, userID, groupID, and usePTY.

进程通过反应器(reactor)的成员函数reactor.spawnProcess()运行. 同时创建一个通向子进程的管道(pipe),由于管道是被加到反应器核心的,所以应用程序不会被阻塞. 在向新进程发送数据或者从新进程接收数据的时候, reactor.spawnProcess()需要两个参数, processProtocol和excutable,还有其它留个可选参数: arguments,environment,path,userID,groupID和usePTY.

   1 from twisted.internet import reactor
   2 
   3 mypp = MyProcessProtocol()
   4 reactor.spawnProcess(processProtocol, executable, args=[program, arg1, arg2],
   5                      env={'HOME': os.environ['HOME']}, path,
   6                      uid, gid, usePTY, childFDs)

3. 写一个进程协议(Writing a ProcessProtocol)

The ProcessProtocol you pass to spawnProcess is your interaction with the process. It has a very similar signature to a regular Protocol, but it has several extra methods to deal with events specific to a process. In our example, we will interface with 'wc' to create a word count of user-given text. First, we'll start by importing the required modules, and writing the initialization for our ProcessProtocol.

传给spawnProcess()的ProcessProtocol是和进程的交互方法.它的签名式和通常的Protocol很相似,只是多了一些专为处理进程事件的方法.在我们的例子中,我们使用"wc"接口来创建一个可以计算用户输入文本的单词数量的应用.首先,我们从包含模块和初始化我们的ProcessProtocol开始.

   1 from twisted.internet import protocol
   2 class WCProcessProtocol(protocol.ProcessProtocol):
   3 
   4     def __init__(self, text):
   5         self.text = text

When the ProcessProtocol is connected to the protocol, it has the connectionMade method called. In our protocol, we will write our text to the standard input of our process and then close standard input, to the let the process know we are done writing to it.

ProcessProtocol连接到protocol的时候,connectionMade()方法会被调用.在我们的协议中,我们把文字写到进程的标准输入然后关闭标准输入通知进程我们已经写完了.

   1 def connectionMade(self):
   2         self.transport.write(self.text)
   3         self.transport.closeStdin()

在这里进程收到了数据,该是我们读结果的时候了.这里没有使用dataReceived()来接收数据,而是使用了从标注输出接收数据的outReceived().这样就可以和标准错误输出的数据区别开来.

   1 def outReceived(self, data):
   2         fieldLength = len(data) / 3
   3         lines = int(data[:fieldLength])
   4         words = int(data[fieldLength:fieldLength*2])
   5         chars = int(data[fieldLength*2:])
   6         self.transport.loseConnection()
   7         self.receiveCounts(lines, words, chars)

Now, the process has parsed the output, and ended the connection to the process. Then it sends the results on to the final method, receiveCounts. This is for users of the class to override, so as to do other things with the data. For our demonstration, we will just print the results.

现在进程已经解析了标准输出的数据,然后断开了连接,然后把数据发送给最后一个函数:receiveCounts().这个函数应该被用户类重写来实现他们自己对数据的处理.在我们的例子里面,只是把它们打印出来

   1 def receiveCounts(self, lines, words, chars):
   2         print 'Received counts from wc.'
   3         print 'Lines:', lines
   4         print 'Words:', words
   5         print 'Characters:', chars

We're done! To use our WCProcessProtocol, we create an instance, and pass it to spawnProcess.

完成了!创建一个WCProcessProtocol实例,传给spawnProcess()给可以用了.

   1 from twisted.internet import reactor
   2 wcProcess = WCProcessProtocol("accessing protocols through Twisted is fun!\n")
   3 reactor.spawnProcess(wcProcess, 'wc', ['wc'])
   4 reactor.run()

4. 进程协议可以做什么(Things that can happen to your ProcessProtocol)

These are the methods that you can usefully override in your subclass of ProcessProtocol:

通常,派生自ProcessProtocol类的子类应该改写以下这些函数

  • .connectionMade: This is called when the program is started, and makes a good place to write data into the stdin pipe (using self.transport.write()).
  • .outReceived(data): This is called with data that was received from the process' stdout pipe. Pipes tend to provide data in larger chunks than sockets (one kilobyte is a common buffer size), so you may not experience the random dribs and drabs behavior typical of network sockets, but regardless you should be prepared to deal if you don't get all your data in a single call. To do it properly, outReceived ought to simply accumulate the data and put off doing anything with it until the process has finished.
  • .errReceived(data): This is called with data from the process' stderr pipe. It behaves just like outReceived.
  • .inConnectionLost: This is called when the reactor notices that the process' stdin pipe has closed. Programs don't typically close their own stdin, so this will probably get called when your ProcessProtocol has shut down the write side with self.transport.loseConnection().

  • .outConnectionLost: This is called when the program closes its stdout pipe. This usually happens when the program terminates.
  • .errConnectionLost: Same as outConnectionLost, but for stderr instead of stdout.
  • .processEnded(status): This is called when the child process has been reaped, and receives information about the process' exit status. The status is passed in the form of a Failure instance, created with a .value that either holds a Failure object if the process terminated normally (it died of natural causes instead of receiving a signal, and if the exit code was 0), or a ProcessTerminatedobject (with an .exitCode attribute) if something went wrong. This scheme may seem a bit weird, but I trust that it proves useful when dealing with exceptions that occur in asynchronous code.

This will always be called afterinConnectionLost, outConnectionLost, and errConnectionLost are called.

  • .connectionMade: 程序开始的时候会调用这这个函数,这里是写数据到标准输入管道的合适时机(使用self.transport.write()).
  • .outReceived(data): 进程在标准输出管道收到数据的时候会调用这个函数.管道趋向于处理比套接字数据量大很多的数据(千字节的buffer是很普通的).也许你没有从套接字获取零星数据的经验,但是如果不知道自己在干什么就不要一次就从管道中取出所有数据.合适的方式是,ouReceived只是简单的收集数据,在程序结束之前处理它们.
  • .errReceived(data): 进程在标准错误管道收到数据的时候会调用这个函数.它的行为和.outReceived(data)类似
  • .inConnnectionLost: 当reactor发现进程的标准输入管道被关闭的时候这个函数会被调用,通常一个程序不会关闭它自己的标准输入,因此这个函数一般会在你的ProcessProtocol调用self.transport.loseConnection()关闭写入端的时候被调用.

  • .outConnectionLost: 这个函数通常在程序关闭他自己的标准输出管道的时候被调用.这通常发生在程序结束的时候.
  • .errConnectinoLost: 标准错误输出管道,其它同.outConnectionLost.
  • .processEnded(status): 当子进程完程的时候会被调用,收到关于进程退出的信息.状态以Failure实例的方式传回.它的成员.value有两种情况,如果进程是正常结束(自然结束而不是因为收到一个信号,并且推出码是0), .value就是个ProcessDone对象,如果有什么地方出错了, .value就是个ProcessTerminated对象(有一个.exitCode属性).这样的安排也许看起来有些怪异,不过我相信对于处理异步代码的异常是非常有用的.

    • 这个函数总会在 inConnectionLost, outConnectionLost 和 errConnectionLost 之后被调用.

The base-class definitions of these functions are all no-ops. This will result in all stdout and stderr being thrown away. Note that it is important for data you don't care about to be thrown away: if the pipe were not read, the child process would eventually block as it tried to write to a full pipe.

基类中这些函数的定义都是空操作(no-ops).这样会导致所有的标准输出和标准错误都被丢弃.注意丢弃你不关心的数据是很重要的: 如果关掉不能读,子进程最终会被阻塞的,因为它可能会试图写一个已经满的管道.

5. 可以对进程做什么(Things you can do from your ProcessProtocol)

The following are the basic ways to control the child process:

下面是控制子进程的基本方法:

  • self.transport.write(data): Stuff some data in the stdin pipe. Note that this write method will queue any data that can't be written immediately. Writing will resume in the future when the pipe becomes writable again.
  • self.transport.closeStdin: Close the stdin pipe. Programs which act as filters (reading from stdin, modifying the data, writing to stdout) usually take this as a sign that they should finish their job and terminate. For these programs, it is important to close stdin when you're done with it, otherwise the child process will never quit.
  • self.transport.closeStdout: Not usually called, since you're putting the process into a state where any attempt to write to stdout will cause a SIGPIPE error. This isn't a nice thing to do to the poor process.
  • self.transport.closeStderr: Not usually called, same reason as closeStdout.
  • self.transport.loseConnection: Close all three pipes.
  • os.kill(self.transport.pid, signal.SIGKILL): Kill the child process. This will eventually result in processEnded being called.
  • self.transport.write(data): 往标准输入管道里面塞数据. 这个write操作如果发现不能立即写入的话就会把数据放入队列,等待管道再次可用的时候再写进去
  • self.transport.closeStdin: 关闭标准输入管道. 扮演过滤器(从标准输入读取数据,修改数据,写到标准输出)角色的程序通常使用这种方式来表示它完成了它的所有工作,要结束了.对于这些程序来说这样做很重要,不然子进程永远不会推出.
  • self.transport.closeStdout: 通常不会被调用,因为这样做会导致进程进入一个如果尝试向标准输出写数据就会引发SIGPIPE错误的状态.这对于可怜的进程来说可不是什么好事.
  • self.transport.closeStderr: 通常不会被调用,原因如上.
  • self.transport.loseConnection: 关闭所有的三种管道
  • os.kill(self.transport.pid, signal.SIGKILL): 杀掉子进程,这么做会导致processEnded()会被调用.

6. Verbose例子(Verbose Example)

Here is an example that is rather verbose about exactly when all the methods are called. It writes a number of lines into the wc program and then parses the output.

下面的例子详细的演示了这些函数是如何被调用的.它写了多行数据到wc程序然后分析它的输出.

   1 #! /usr/bin/python
   2 
   3 from twisted.internet import protocol
   4 from twisted.internet import reactor
   5 import re
   6 
   7 class MyPP(protocol.ProcessProtocol):
   8     def __init__(self, verses):
   9         self.verses = verses
  10         self.data = ""
  11     def connectionMade(self):
  12         print "connectionMade!"
  13         for i in range(self.verses):
  14             self.transport.write("Aleph-null bottles of beer on the wall,\n" +
  15                                  "Aleph-null bottles of beer,\n" +
  16                                  "Take one down and pass it around,\n" +
  17                                  "Aleph-null bottles of beer on the wall.\n")
  18             self.transport.closeStdin() # tell them we're done
  19     def outReceived(self, data):
  20         print "outReceived! with %d bytes!" % len(data)
  21         self.data = self.data + data
  22     def errReceived(self, data):
  23         print "errReceived! with %d bytes!" % len(data)
  24     def inConnectionLost(self):
  25         print "inConnectionLost! stdin is closed! (we probably did it)"
  26     def outConnectionLost(self):
  27         print "outConnectionLost! The child closed their stdout!"
  28         # now is the time to examine what they wrote
  29         #print "I saw them write:", self.data
  30         (dummy, lines, words, chars, file) = re.split(r'\s+', self.data)
  31         print "I saw %s lines" % lines
  32     def errConnectionLost(self):
  33         print "errConnectionLost! The child closed their stderr."
  34     def processEnded(self, status_object):
  35         print "processEnded, status %d" % status_object.value.exitCode
  36         print "quitting"
  37         reactor.stop()
  38 
  39 pp = MyPP(10)
  40 reactor.spawnProcess(pp, "wc", ["wc"], {})
  41 reactor.run()

The exact output of this program depends upon the relative timing of some un-synchronized events. In particular, the program may observe the child process close its stderr pipe before or after it reads data from the stdout pipe. One possible transcript would look like this:

这个程序的准确输出依赖于一些异步事件的发生时机.比方说,子进程可能在它从标准输出读数据之前或之后关闭标准错误管道.一个可能的输出如下:

% ./process.py 
connectionMade!
inConnectionLost! stdin is closed! (we probably did it)
errConnectionLost! The child closed their stderr.
outReceived! with 24 bytes!
outConnectionLost! The child closed their stdout!
I saw 40 lines
processEnded, status 0
quitting
Main loop terminated.
% 

7. 容易一些(Doing ti the Easy Way)

Frequently, one just needs a simple way to get all the output from a program. In the blocking world, you might use commands.getoutput from the standard library, but using that in an event-driven program will cause everything else to stall until the command finishes. (in addition, the SIGCHLD handler used by that function does not play well with Twisted's own signal handling). For these cases, the twisted.internet.utils.getProcessOutput function can be used. Here is a simple example:

经常需要通过简单的方法来得到一个程序的所有输出(指标准输出).在阻塞的世界里,你可以使用标准库中的commands.getoutput,但是如果在事件驱动的程序里面这么做的话就会导致所有的事情都停下来等待命令结束(此外,这个函数使用的SIGCHLD信号处理也不能和Twisted的信号处理方法很好的共处).这时应该使用"twisted.internet.utils.getProcessOutput",下面是个简单的例子:

   1 from twisted.internet import protocol, utils, reactor
   2 from twisted.python import failure
   3 from cStringIO import StringIO
   4 
   5 class FortuneQuoter(protocol.Protocol):
   6 
   7     fortune = '/usr/games/fortune'
   8 
   9     def connectionMade(self):
  10         output = utils.getProcessOutput(self.fortune)
  11         output.addCallbacks(self.writeResponse, self.noResponse)
  12 
  13     def writeResponse(self, resp):
  14         self.transport.write(resp)
  15         self.transport.loseConnection()
  16 
  17     def noResponse(self, err):
  18         self.transport.loseConnection()
  19 
  20 
  21 if __name__ == '__main__':
  22     f = protocol.Factory()
  23     f.protocol = FortuneQuoter
  24     reactor.listenTCP(10999, f)
  25     reactor.run()

If you only need the final exit code (like commands.getstatusoutput(cmd)[0]), the twisted.internet.utils.getProcessValue function is useful. Here is an example:

如果你只是想得到最后的退出码(就像commands.getstatusoutput(cmd)[0])."twisted.internet.utils.getProcessValue"对你来说非常有用,下面是个例子:

   1 from twisted.internet import utils, reactor
   2 
   3 def printTrueValue(val):
   4     print "/bin/true exits with rc=%d" % val
   5     output = utils.getProcessValue('/bin/false')
   6     output.addCallback(printFalseValue)
   7 
   8 def printFalseValue(val):
   9     print "/bin/false exits with rc=%d" % val
  10     reactor.stop()
  11 
  12 output = utils.getProcessValue('/bin/true')
  13 output.addCallback(printTrueValue)
  14 reactor.run()

7.1. 加入文件描述符的(ProcessProtocols with extra file descriptors)

When you provide a childFDs dictionary with more than the normal three fds, you need addtional methods to access those pipes. These methods are more generalized than the .outReceived ones described above. In fact, those methods (outReceived and errReceived) are actually just wrappers left in for compatibility with older code, written before this generalized fd mapping was implemented. The new list of things that can happen to your ProcessProtocol is as follows:

如果你提供了通常使用的三种fd以外的fd,你就需要编写额外的函数来访问这些管道.这些函数比上面描述的.outReceived()更通用.事实上,那些函数(.outReceived()和.errReceived())只是为了兼容性而对通用函数做了包装,主要是为了兼容在通用fd之前就已经实现的标准输入输出和标准错误输出.在你的ProcessProtocol中的新的事件列表如下:

  • .connectionMade: This is called when the program is started.
  • .childDataReceived(childFD, data): This is called with data that was received from one of the process' output pipes (i.e. where the childFDs value was r. The actual file number (from the point of view of the child process) is in childFD. For compatibility, the default implementation of .dataReceived dispatches to .outReceived or .errReceived when childFD is 1 or 2.
  • .childConnectionLost(childFD): This is called when the reactor notices that one of the process' pipes has been closed. This either means you have just closed down the parent's end of the pipe (with .transport.closeChildFD), the child closed the pipe explicitly (sometimes to indicate EOF), or the child process has terminated and the kernel has closed all of its pipes. The childFD argument tells you which pipe was closed. Note that you can only find out about file descriptors which were mapped to pipes: when they are mapped to existing fds the parent has no way to notice when they've been closed. For compatibility, the default implementation dispatches to .inConnectionLost, .outConnectionLost, or .errConnectionLost.
  • .processEnded(status): This is called when the child process has been reaped, and all pipes have been closed. This insures that all data written by the child prior to its death will be received before .processEnded is invoked.
  • .connectionMade: 程序开始的时候被调用
  • .childDataReceived(childFD, data): 数据从进程的一个输出管道(就是childFDs的值为"r")上到达的时候被调用.真正的文件描述符(从子进程的观点来看)在childFD中.考虑到兼容性, .dataReceived()的缺省实现是当childFD为1或者2的时候就分发数据到.outReceived()或者.errReceived()
  • .childConnectionLost(childFD): 当reactor发现进程的一个管道被关闭的时候被调用.这意味着或者是你在父进程端关闭了管道(用.transport.closeChildFD()),或者是子进程显式的关闭了管道(用EOF标志),或者是子进程已经结束核心关闭了所有它的管道.参数childFD告诉你哪一个管道被关闭了.要注意的是你只能发现已经映射到管道上的文件安描述符的关闭消息,当它们映射到一句能够从在的fd的时候父进程是没有办法知道它们被关闭的.同样为了兼容性,缺省实现会分发事件到.inConnectinoLost() .outConnectLost() 或者是 .errConnectionLost()
  • .processEnded(status): 当子进程完成任务,所有管道都被关闭的时候被调用.这样可以确保所有在之前写入的数据都可以在.processEnded()调用之前收到.

In addition to those methods, there are other methods available to influence the child process:

除此之外,还有几个对子进程有影响的方法:

  • self.transport.writeToChild(childFD, data): Stuff some data into an input pipe. .write simply writes to childFD=0.
  • self.transport.closeChildFD(childFD): Close one of the child's pipes. Closing an input pipe is a common way to indicate EOF to the child process. Closing an output pipe is neither very friendly nor very useful.
  • os.kill(self.transport.pid, signal.SIGKILL): Kill the child process. This will eventually result in processEnded being called.
  • self.tranport.writeToChild(childFD, data): 往输入管道里面写数据. .write()只是写到childFD=0的管道
  • self.transport.closeChildFD(childFD): 关闭子进程的一个管道.关闭输入管道是发送EOF给子进程的一种常用方法.而关闭输出管道则不怎么有用也不太友好.
  • os.kill(self.transport.pid, signal.SIGKILL): 杀掉子进程.会导致processEnded()被调用.

7.2. 例子

GnuPG, the encryption program, can use additional file descriptors to accept a passphrase and emit status output. These are distinct from stdin (used to accept the crypttext), stdout (used to emit the plaintext), and stderr (used to emit human-readable status/warning messages). The passphrase FD reads until the pipe is closed and uses the resulting string to unlock the secret key that performs the actual decryption. The status FD emits machine-parseable status messages to indicate the validity of the signature, which key the message was encrypted to, etc.

GnuPG,一个加密程序,使用额外的文件描述符来得到passphrase和输出状态.和标准输入(用来接收crypttext)输出(用来打印plaintext)和标准错误输出(用来输出友好的状态/警告消息)截然不同.passphrase文件描述从管道中读入数据直到管道被关闭,然后用得到的字串解锁真正加密用的秘钥.向状态文件描述符输出易于机器解析的状态消息来指出签名的有效性.

gpg accepts command-line arguments to specify what these fds are, and then assumes that they have been opened by the parent before the gpg process is started. It simply performs reads and writes to these fd numbers.

gpg接受命令行参数来指定这些文件描述符,假定他们在gpg进程开始前已经被父进程打开.它只是通过这些文件描述符来读写.

To invoke gpg in decryption/verification mode, you would do something like the following:

为了以解密/验证模式调用gpg,可以这么做:

   1 class GPGProtocol(ProcessProtocol):
   2     def __init__(self, crypttext):
   3         self.crypttext = crypttext
   4         self.plaintext = ""
   5         self.status = ""
   6     def connectionMade(self):
   7         self.transport.writeToChild(3, self.passphrase)
   8         self.transport.closeChildFD(3)
   9         self.transport.writeToChild(0, self.crypttext)
  10         self.transport.closeChildFD(0)
  11     def childDataReceived(self, childFD, data):
  12         if childFD == 1: self.plaintext += data
  13         if childFD == 4: self.status += data
  14     def processEnded(self, status):
  15         rc = status.value.exitCode
  16         if rc == 0:
  17             self.deferred.callback(self)
  18         else:
  19             self.deferred.errback(rc)
  20 
  21 def decrypt(crypttext):
  22     gp = GPGProtocol(crypttext)
  23     gp.deferred = Deferred()
  24     cmd = ["gpg", "--decrypt", "--passphrase-fd", "3", "--status-fd", "4",
  25            "--batch"]
  26     p = reactor.spawnProcess(gp, cmd[0], cmd, env=None,
  27                              childFDs={0:"w", 1:"r", 2:2, 3:"w", 4:"r"})
  28     return gp.deferred

In this example, the status output could be parsed after the fact. It could, of course, be parsed on the fly, as it is a simple line-oriented protocol. Methods from LineReceiver could be mixed in to make this parsing more convenient.

在这个例子中,状态输出可以被解析,它当然可以在任何时候解析(这个不会译原文on the fly),它只是个面向行的协议.LineReciver()方法调用的其它方法使的解析过程非常方便.

The stderr mapping (2:2) used will cause any GPG errors to be emitted by the parent program, just as if those errors had caused in the parent itself. This is sometimes desireable (it roughly corresponds to letting exceptions propagate upwards), especially if you do not expect to encounter errors in the child process and want them to be more visible to the end user. The alternative is to map stderr to a read-pipe and handle any such output from within the ProcessProtocol (roughly corresponding to catching the exception locally).

标准错误输出映射(2:2)使任何GPG错误都能被它的父程序输出,就像这些错误是父程序自己发生的一样.有时这就是我们想要的(就像异常的向上抛出一样),尤其是你向不想在子进程中处理错误而是使这些错误对于最终用户更易读的时候.另一个方法是映射标准错误输出管道到一个由类似ProcessProtocol处理的读管道(就像直接在本地处理异常.)

  • 翻译 -- Jerry Marx.

(目录)Index

Version: 1.3.0