https://blog.csdn.net/weixin_39059031/article/details/108440342


前言

进程与线程

  进程中的一个执行任务(控制单元),负责当前进程中程序的执行。一个进程至少有一个线程,一个进程可以运行多个线程,多个线程可共享数据。

  与进程不同的是同类的多个线程共享进程的堆和方法区资源,但每个线程有自己的程序计数器、虚拟机栈和本地方法栈,所以系统在产生一个线程,或是在各个线程之间作切换工作时,负担要比进程小得多,也正因为如此,线程也被称为轻量级进程

  进程是资源分配的最小单位,线程是CPU调度的最小单位。做个简单的比喻:进程=火车,线程=车厢。

  python标准库中有一个多进程模块multiprocesing,它可以支持在代码创建多个进程协同运行的计算模型,此模块很多接口名和参数,都与多线程一致。

从代码上看多进程,依然是给进程指定一个函数作为入口,python底层自动启动一个独立进程从此入口开始执行。如果是GUI程序,多进程可以更好的实现多个tkinter的root窗口。

  • 为什么要多进程?

  如果IO密集型的任务,CPU大部分时间闲着的,可以用于干其它的事情,多线程就可以了, 比如CPU比硬盘,内存好很多,此时,系统运作,大部分的状况是CPU在等I/O(硬盘/内存) 的读/写操作。

  如果是计算密集型的任务,有多个CPU时,想要减少CPU在各个资源调度上的时间,就要考虑多进程,以充分利用资源。

  CPython解释器中的GIL(Global Intercepto Lock,全局解释器锁。GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念),限制了多线程无法充分利用多CPU资源,因此有了多进程的用武之地。

  • 并行和并发并行和并发同属于多任务,目的是要提高CPU的使用效率。这里需要注意的是,一个CPU永远不可能实现并行,即一个CPU不能同时运行多个程序,但是可以在随机分配的时间片内交替执行(并发),就好像一个人不能同时看两本书,但是却能够先看第一本书半分钟,再看第二本书半分钟,这样来回切换。

  • GIL锁

  Guido van Rossum(吉多·范罗苏姆)创建Python时就只考虑到单核CPU,解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁,于是有了GIL这把超级大锁。因为cpython解析只允许拥有GIL全局解析器锁才能运行程序,这样就保证了保证同一个时刻只允许一个线程可以使用cpu。由于大量的程序开发者接收了这套机制,现在代码量越来越多,已经不容易通过C代码去解决这个问题。

  每个线程在执行时候都需要先获取GIL,保证同一时刻只有一个线程可以执行代码,即同一时刻只有一个线程使用CPU,也就是说多线程并不是真正意义上的并行执行。

那么,我们改如何解决GIL锁的问题呢?

  1. 更换cpython为jpython(不建议)
  2. 使用多进程完成多线程的任务
  3. 在使用多线程可以使用c语言去实现

问题1: 什么时候会释放Gil锁?

  1. 遇到像 i/o操作这种 会有时间空闲情况 造成cpu闲置的情况会释放Gil
  2. 会有一个专门ticks进行计数 一旦ticks数值达到100 这个时候释放Gil锁 线程之间开始竞争Gil锁(说明:ticks这个数值可以进行设置来延长或者缩减获得Gil锁的线程使用cpu的时间)

问题2: 互斥锁和Gil锁的关系

Gil锁 : 保证同一时刻只有一个线程能使用到CPU。
互斥锁 : 多线程时,保证修改共享数据时有序的修改,不会产生数据修改混乱。
GIL与多线程:有了GIL的存在,Python有这两个特点:

  1. 进程可以利用多核,但是开销大。
  2. 多线程开销小,却无法利用多核优势。

  也就是说Python中的多线程是假的多线程,Python解释器虽然可以开启多个线程,但同一时间只有一个线程能在解释器中执行,而做到这一点正是由于GIL锁的存在,它的存在使得CPU的资源同一时间只会给一个线程使用,而由于开启线程的开销小,所以多线程才能有一片用武之地,不然就真的是鸡肋了。

多进程multiprocessing

多进程实现方法

  多进程的实现方法可以有两种,一种是直接设置子进程为一个函数和设置子进程为一个类,继承Process:

  1. 通过Python的Multiprocessing模块创建多进程的计算模型
from multiprocessing import Process import time import random def fun1(i): time.sleep(random.randint(0,2)) print('multiprocess test %d' %i) if __name__ == '__main__': process_list = [] for i in range(5): #开启5个子进程执行fun1函数 p = Process(target=fun1,args=(i,)) #实例化进程对象 p.start() process_list.append(p) for p in process_list: p.join() print('Done!') 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

  通过multiprocessing模块的Process创建进程对象,然后start。join用来阻塞等待进程执行结束。接口基本与多线程一致。p.join()的意思是等待子进程结束后才执行后续的操作,一般用于进程间通信。例如有一个读进程pw和一个写进程pr,在调用pw之前需要先写pr.join(),表示等待写进程结束之后才开始执行读进程。

  多进程的入口也就是一个普普通通的函数!但是它是以独立进程的方式运行。这里子进程的运行是并发的,不能控制其先后顺序,想要按照顺序获取某些结果的话,需要额外的控制信息。

  1. 用继承的方法创建进程
from multiprocessing import Process import time import random class MyProcess(Process): #继承Process类 def __init__(self,name): super(MyProcess,self).__init__() self.name = name def run(self): time.sleep(random.randint(0,2)) print('测试多进程 %s' % self.name) if __name__ == '__main__': process_list = [] for i in range(5): #开启5个子进程执行fun1函数 p = MyProcess(str(i)*8) #实例化进程对象 p.start() process_list.append(p) for p in process_list: p.join() print('Done!') 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

  上述代码同样不考虑子进程的运行结果以及顺序。

  并且不考虑子进程的返回结果,那此时如果子进程想要返回一些结果给父进程该如何实现呢?

返回子进程的结果

  想要返回子进程的结果可以考虑使用:多进程共享全局变量之Manager()或者进程通信。

  • Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。

  主进程与子进程是并发执行的,进程之间默认是不能共享全局变量的(子进程不能改变主进程中全局变量的值)。如果要共享全局变量需要(multiprocessing.Value("d",10.0),数值)(multiprocessing.Array("i",[1,2,3,4,5]),数组)(multiprocessing.Manager().dict(),字典)(multiprocessing.Manager().list(range(5)))。

  • 进程通信(进程之间传递数据)用进程队列(multiprocessing.Queue(),单向通信),管道(multiprocessing.Pipe(),双向通信)。

多进程共享全局变量之Manager()

import multiprocessing as mp from multiprocessing import Manager def job(temp, i): for j in range(i): temp.append(j) if __name__ == "__main__": seed_episodes = 3 process_list = [] manager = Manager() # temp_list = manager.list() # manager.dict() temp_lists = [manager.list() for i in range(seed_episodes)] for i in range(seed_episodes): p = mp.Process(target=job, args=(temp_lists[i],i,)) p.start() process_list.append(p) for i, p in enumerate(process_list): p.join() print('temp {}'.format(list(temp_lists[i]))) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

  但是这里如果你想要对字典进行深度赋值的话就会出现问题,比如你的字典的值是一个列表,而你想要对这个列表再进行操作就会赋值无效,他也不报错,就需要小心一点。更详细的可以参考python多进程变量Manager.dict() | 深度赋值无效问题解决

  但是这个只限制到数组、数字、字典这些,我们能不能做一个自己的类呢,让我们自己定义的类实现全局共享?

  • 自定义全局共享类

  上文中的全局共享数据类型是通过Manager()的方法来实现的。我们也可以仿照这个做一个类似的:

  1. 导入基类
from multiprocessing.managers import BaseManager class Base_Manager(BaseManager): pass def my_manager(): m = Base_Manager() m.start() return m
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 定义一个自己要实现的类
from multiprocessing import Process, Value, Lock class Employee(object): def __init__(self, name, salary): self.name = name
        self.salary = Value('i', salary) self.data = [] def increase(self): self.salary.value += 100 self.data.append(self.salary.value) print(self.data) def getPay(self): return self.name + ':' + str(self.salary.value) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  1. 主程序调用
import multiprocessing as mp from multiprocessing import Process, Value, Lock from XXX import Base_Manager, my_manager # 从创建的文件中导入定义好的基类 import Employee # 导入自定义类 Base_Manager.register('Employee', Employee) def func1(em, lock): with lock: em.increase() if __name__ == '__main__': mp.set_start_method("spawn") manager = my_manager() em = manager.Employee('zhangsan', 1000) lock = Lock() proces = [Process(target=func1, args=(em, lock)) for i in range(10)] for p in proces: p.start() for p in proces: p.join() print(em.getPay()) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

  这样确实可以,但是这个全局的类,这个是没有办法debug的,但是可以调用这个类下的方法,并且能够得到返回值。

  还查到一个调试多进程的方法,大体意思是:把from multiprocessing import Process换成from multiprocessing.dummy import Process。

多进程队列Queue通信

  multiprocess.Queue,是提供个python多进程见通信使用的。python有一个好的设计,即多线程和多进程的接口基本相同,现在这两个Queue的使用接口也基本相同:

from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) def g(q): q.put([48, None, 'hello gggg']) def h(q): print(q.get()) print(q.get()) if __name__ == '__main__': q = Queue() p1 = Process(target=f, args=(q,)) p2 = Process(target=g, args=(q,)) p3 = Process(target=h, args=(q,)) p1.start() p2.start() p3.start() p1.join() p2.join() p3.join() 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

主进程开了3个子进程,两个做put,一个做get。运行结果:

[42, None, 'hello'] [48, None, 'hello gggg'] 
        
  • 1
  • 2

多进程管道Pipe通信

  进程队列通信multiprocessing.Queue()是单向通信,管道通信multiprocessing.Pipe()是双向通信。

  创建Pipe时会返回2个连接对象(conn1,conn2),代表管道的两端,默认是双向通信的,即conn1和conn2都可以收发消息。一般命名为一个父进程的管道,一个是子进程的管道。

  Pipe是数据不安全的,所以如果是多个进程之间同时收发消息时,需要自己加锁以达到数据安全。

  • 管道通信区分子进程返回值

  如果想要实现获取子进程的返回值,并且区分的话,可以采用给传入值打上标签或者采用多个管道:

from multiprocessing import Process,Pipe import time class Worker(Process): def __init__(self, conn, i): super(Worker, self).__init__() self.conn = conn
        self.i = i def run(self) -> None: time.sleep(3-self.i) metrics_recv = self.conn.recv() metrics_recv['steps'].append(self.i) self.conn.send(metrics_recv) self.conn.close() if __name__ == "__main__": metrics = [({'steps': [i]}) for i in range(3)] pipes = [Pipe() for i in range(3)] workers = [Worker(child, i) for i, [parent, child] in enumerate(pipes)] for i, w in enumerate(workers): w.start() pipes[i][0].send(metrics[i]) [w.join() for w in workers] for i in pipes: metrics = i[0].recv() print("metrics_recv", metrics) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

  但是这个管道不能传太大的数据,比如传个tensor数组的话可能就会爆了,因此最好还是设置全局共享区域。

subprocess模块

  Python的subprocess模块,用来创建和管理子进程(不是线程),并能够与创建的子进程的stdin,stdout,stderr连接通信,获取子进程执行结束后的返回码,在执行超时或执行错误时得到异常。

  从Python3.5版本开始,subprocess模块内部又进行了一次整合 ,最后就剩下官方推荐的两个接口函数,分别是:

subprocess.run() subprocess.Popen() 
        
  • 1
  • 2

  考虑到这个模块对外接口的函数和对象名称都比较特别,本文就这样来引入:

>>> from subprocess import * >>> dir() ['CalledProcessError', 'CompletedProcess', 'DEVNULL', 'PIPE', 'Popen', 'STDOUT', 'SubprocessError', 'TimeoutExpired', '__annotations__', '__builtins__', '__doc__', '__loader__', '__name__', '__package__', '__spec__', 'call', 'check_call', 'check_output', 'getoutput', 'getstatusoutput', 'run'] 
        
  • 1
  • 2
  • 3

  call,check_call,check_output,getoutput,getstatusoutput这些函数,都被run函数代替了,它们在存在只是为了保持向下兼容。

subprocess.run() 函数的使用

  从Python3.5开始,出现了run函数,用来代替之前版本的一些函数接口。run函数的作用是:执行args参数所表示的命令,等待命令执行完毕,返回一个CompletedProcess对象。

  注意:run函数是同步函数,要等待

  • 同步函数:当一个函数是同步执行时,那么当该函数被调用时不会立即返回,直到该函数所要做的事情全都做完了才返回。
  • 异步函数:如果一个异步函数被调用时,该函数会立即返回尽管该函数规定的操作任务还没有完成。

  run()函数的接口参数:

subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, encoding=None, errors=None, text=None, env=None, universal_newlines=None) 
        
  • 1
  • 2
  • 3
  • 4
  1. args参数,就是要通过创建进程而执行的命令及参数,run函数通过args来创建一个进程并执行。

  2. shell参数,表示是否通过shell来执行命令(Linux下默认为/bin/sh),默认是False,这时args只能是一个不带参数的命令字符串,或者是命令和参数组成的一个list,如果shell=True,args就可以是一个我们常见的命令字符串。

>>> run('ls') >>> run(['ls','-lh']) >>> run('ls -lh', shell=True) 
        
  • 1
  • 2
  • 3

  注意run函数返回的CompletedProcess对象,里面包含了args,以及命令执行的返回码。下面的代码示例,说明了访问CompletedProcess对象的方式:

>>> proc = run('ls') Desktop    Downloads      Music     Public     test
Documents  examples.desktop  Pictures  Templates  Videos >>> proc.args 'ls' >>> proc.returncode 0 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

  input参数,命令的具体输入内容,默认None,表示没有输入。input与stdin不能同时使用。先看一个有input参数的例子:

>>> proc = run('grep fs',shell=True,input=b'adfs\ncccc\nfsfsf') adfs
fsfsf >>> proc
CompletedProcess(args='grep fs', returncode=0) # input默认是一个bytes流。 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. stdin参数:指定命令的输入途径;
>>> f = open('tt.t','r') >>> proc = run('cat -n', shell=True, stdin=f) 1 12345 2 abcde 3 xyz.. >>> f.close() 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. stdout参数:指定命令的输出途径;默认为None,如上面的代码示例,输出就直接打印出来了;
>>> proc = run('grep fs',shell=True,input=b'adfs\ncccc\nfsfsf',stdout=PIPE) >>> proc
CompletedProcess(args='grep fs', returncode=0, stdout=b'adfs\nfsfsf\n') >>> proc.stdout
b'adfs\nfsfsf\n' 
        
  • 1
  • 2
  • 3
  • 4
  • 5

  stdout=PIPE,表示将stdout重定向到管道,用了这个参数,grep fs命令的结果,就不会直接打印出来,而是存入了proc.stdout这个管道内。

  1. stderr参数:指定命令的error输出途径;
>>> proc = run('ls fs',shell=True,stdout=PIPE,stderr=PIPE) >>> proc.stdout
b'' >>> proc.stderr
b"ls: cannot access 'fs': No such file or directory\n" 
        
  • 1
  • 2
  • 3
  • 4
  • 5

  看一个stdout与input配合起来使用的例子,有点像我们在Linux shell输入的有管道的命令行:

>>> proc = run('grep fs',shell=True,input=b'adfs\ncccc\nfsfsf',stdout=PIPE) >>> run('cat -n',shell=True, input=proc.stdout) 1 adfs 2 fsfsf
CompletedProcess(args='cat -n', returncode=0) 
        
  • 1
  • 2
  • 3
  • 4
  • 5

  把stderr重定向到stdout:

>>> proc = run('ls kk', shell=True, stdout=PIPE, stderr=STDOUT) >>> proc.stdout
b"ls: cannot access 'kk': No such file or directory\n" 
        
  • 1
  • 2
  • 3
  1. capture_output参数:这个参数顾名思义就是捕获进程的输出,stdout和stderr。capture_output=True的效果与设置stdout=PIPE, stderr=PIPE一样。设置了capture_output=True,就不能再设置stdout和stderr:
>>> proc = run('ls kk', shell=True, capture_output=True) >>> proc
CompletedProcess(args='ls kk', returncode=2, stdout=b'', stderr=b"ls: cannot access 'kk': No such file or directory\n") >>> proc.stdout
b'' >>> proc.stderr
b"ls: cannot access 'kk': No such file or directory\n" 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  1. cwd参数:这个参数指示了当前工作路径:
>>> proc = run('ls -lh', shell=True, cwd='/usr/local') total 36K
drwxr-xr-x 2 root root 4.0K Feb 9 16:12 bin drwxr-xr-x 2 root root 4.0K Feb 9 16:12 etc
drwxr-xr-x 2 root root 4.0K Feb 9 16:12 games
drwxr-xr-x 2 root root 4.0K Feb 9 16:12 include
drwxr-xr-x 3 root root 4.0K Jun 28 21:54 lib
lrwxrwxrwx 1 root root 9 Jun 28 21:32 man -> share/man
drwxr-xr-x 6 root root 4.0K Jun 28 23:34 python-3.7.3 drwxr-xr-x 2 root root 4.0K Feb 9 16:12 sbin
drwxr-xr-x 6 root root 4.0K Feb 9 16:15 share
drwxr-xr-x 2 root root 4.0K Feb 9 16:12 src
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  1. text参数,universal_newlines参数

  这两个参数的作用是一样的,universal_newlines这个参数的存在也是为了向下兼容(Python3.7开始有text参数,3.5和3.6都是universal_newlines参数),因此我们使用text就好了。text参数的作用是,将stdin,stdout,stderr修改为string模式。上面的示例代码,都是bytes流:

>>> run('grep fs', shell=True, input=b'asdfs\nfdfs', capture_output=True) CompletedProcess(args='grep fs', returncode=0, stdout=b'asdfs\nfdfs\n', stderr=b'') >>> run('grep fs', shell=True, input='asdfs\nfdfs', capture_output=True, text=True) CompletedProcess(args='grep fs', returncode=0, stdout='asdfs\nfdfs\n', stderr='') 
        
  • 1
  • 2
  • 3
  • 4
  1. timeout参数:设置进程执行的超时时间。如果时间到子进程还未结束,subprocess.TimeoutExpired异常会抛出。timeout参数的单位是秒。
>>> try: ... run('python3', shell=True, input=b'import time;time.sleep(30)', timeout=1) ... except TimeoutExpired: ... print('timeout happened...') ... timeout happened... 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

以上代码,就是sleep 30秒,run函数设置timeout为1秒,触发subprocess.TimeoutExpired后,打印一点信息出来。

  1. check参数:如果check=True,在子进程的返回不为0的时候,抛出subprocess.CalledProcessError异常。这时,run函数返回的CompletedProcess对象的returncode不可用。
>>> try: ... proc = run('ls kk', shell=True, check=True, stderr=PIPE) ... except CalledProcessError: ... print(proc.returncode) ... 0 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  上面这段代码,走到了except里面,因为kk目录不存在,但是打印出来的returncode却是0,run函数没有成功返回,而是抛出异常,因此返回值不可用。

subprocess.Popen()函数

  run函数的底层,就是Popen函数。run函数是同步的,要等待子进程实行结束,或者超时。Popen创建子进程后,采用异步的方式,不会等待,要通过poll函数来判断子进程是否执行完毕。

Popen(args, bufsize=-1, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=True, shell=False, cwd=None, env=None, universal_newlines=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=(), *, encoding=None, errors=None, text=None) 
        
  • 1
  • 2
  • 3
  • 4
  • 5

  参数args,stdin,stdout,stderr,shell,cwd,universal_newlines,text与run函数的含义和用法都是一样的。

  1. Popen函数的基本用法
>>> proc = Popen('ls -hl', shell=True, stdout=PIPE, stderr=STDOUT) >>> out, _ = proc.communicate() >>> print(out.decode()) total 37M -rw-r--r-- 1 xinlin xinlin 535 Jun 29 06:03 apache_log_reader.py -rw-r--r-- 1 xinlin xinlin 3.2M Jun 30 02:55 py.maixj.sql -rw-r--r-- 1 xinlin xinlin 3.2M Jun 29 19:20 py.online.sql
drwxr-xr-x 19 xinlin xinlin 4.0K Jun 28 23:24 Python-3.7.3 -rw-r--r-- 1 xinlin xinlin  22M Mar 25 13:59 Python-3.7.3.tgz -rw-r--r-- 1 xinlin xinlin 27 Jul 5 01:05 sleep.py -rw-r--r-- 1 xinlin xinlin 18 Jul 5 00:10 tt.t -rw-r--r-- 1 xinlin xinlin 800 Jun 29 03:26 walktree.py -rw-r--r-- 1 xinlin xinlin 8.2M Jun 29 05:47 www.access_log_2019_06_28 >>> proc.returncode 0 >>> proc.pid 2985 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

  Popen函数以异步的方式创建一个子进程,返回一个Popen对象。我们通过communicate函数来获取stdout和stderr。communicate函数返回一个tuple,以上示例是将stderr=STDOUT,因此使用 _ 来表示为空的stderr。

  因为subprocess模块是用调用shell命令的方式创建进程,我们可以直接用这一行shell命令启动后台进程:

import subprocess
subprocess.Popen('python3.8 start.py', shell=True) 
        
  • 1
  • 2
  1. 设置子进程的输入和超时时间

  Popen对象的communicate函数有两个参数,input和timeout,分别用来设置给子进程的输入和超时时间。有timeout参数,表示communicate函数会等待子进程执行结束,或者超时。

>>> proc = Popen('grep fs', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) >>> out, err = proc.communicate(b'adfs\nfsmnjkl') >>> out
b'adfs\nfsmnjkl\n' >>> err
b'' 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

  1.Popen以异步的方式创建子进程,创建时可以设定stdin,stdout和stderr全部指向PIPE,此时子进程的输入输出全部都在管道中,就像我们再shell命令行直接使用管道(|)一样!

  2. 在使用管道(|)连接多个程序的时候,前一个程序的输出成为了后一个程序的输入,此时如果假设后一个程序时通过subprocess的Popen创建的,那么此时此子进程的stdin,就是前一个程序的输出,而它的stdout和stderr,通过communicate函数,可以直接获得。

  通过子进程的communicate函数,我们可以像使用shell的管道一样,直接连接多个程序的输入和输出;但是,这种输入和输出,也跟shell管道一样,是一次性的;即如果某个程序有运行时会连续多次获取输入,communicate就无能为力(此时就要使用pexpect)。

  再来一个有timeout的例子:

>>> proc = Popen('python3', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE) >>> try: ... out,err=proc.communicate(b'import time;time.sleep(30)', 1) ... except TimeoutExpired: ... print('time out...') ... time out... 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

  Popen对象有一个wait成员函数,也可以设置一个timeout来等待子进程的结束:

>>> try: ... proc=Popen('python3 -c "import time;time.sleep(30)"',shell=True,stdout=PIPE) ... returncode = proc.wait(15) ... except TimeoutExpired: ... print('after waiting 15 seconds, timeout finally...') ... after waiting 15 seconds, timeout finally... 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

  注意对returncode的赋值,如果timeout发生,returncode就是not defined。当然也可以通过proc.returncode来获取。如果异常,proc.returncode的值是None。

  如果想确定子进程执行的时长,可以采用poll函数:

>>> def test_Popen(): ... import time ... proc=Popen('python3 -c "import time;time.sleep(10)"',shell=True,stdout=PIPE) ... i = 0 ... while True: ... returncode = proc.poll() ... if returncode is None: ... time.sleep(2) ... i += 2 ... print('sleep',i,'seconds') ... continue ... else: ... print('sub process is terminated with returncode',returncode) ... break ... >>> test_Popen() sleep 2 seconds
sleep 4 seconds
sleep 6 seconds
sleep 8 seconds
sleep 10 seconds
sleep 12 seconds
sub process is terminated with returncode 0 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

  Popen对象还有下列几个成员函数:

Popen.send_signal(signal) Popen.terminate() Popen.kill() 
        
  • 1
  • 2
  • 3
  • 更多参考官方文档:https://docs.python.org/3/library/subprocess.html

实时获取subprocess子进程的输出

大体思路:既然是实时获取subprocess创建的子进程的输出,我们就不能使用run,而要使用Popen创建异步子进程,然后通过poll函数来查看异步子进程的执行状态,如果执行没有结束,我们就直接去读取子进程的stdout,然后在主进程中处理。

  1. 子进程代码,代码在subproc.py文件中:
import time while True: print('subprocess print...', flush=True) time.sleep(1) 
        
  • 1
  • 2
  • 3
  • 4

注意:print函数一定要使用flush=True,否则子进程的输出都在缓存中,主进程也无法读取出来!

  1. 主进程代码,在proc.py文件中:
from subprocess import * proc = Popen('python subproc.py', shell=True, stdout=PIPE, stderr=PIPE) while True: rcode = proc.poll() if rcode is None: print('from subprocess: ', end='') line = proc.stdout.readline().strip() print(line.decode()) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  1. 两个.py文件放在同一目录下,下面是执行效果:
E:\py>python proc.py from subprocess: subprocess print... from subprocess: subprocess print... from subprocess: subprocess print... from subprocess: subprocess print... from subprocess: subprocess print... from subprocess: Traceback (most recent call last): File "proc.py", line 10, in line = proc.stdout.readline().strip() KeyboardInterrupt
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

  用这个技术,可以在python命令行程序上套一层GUI的壳!在GUI程序中,用subprocess以子进程的方式启动命令行程序,实时获取子进程的输出并在GUI中显示出来。

不过有个问题,原命令行程序的print如果没有加flush=True参数怎么办?可以使用python命令行的-u参数!

如果subproc.py的代码是这样的,即print没有flush=True:

import time while True: print('subprocess print...') time.sleep(1) 
        
  • 1
  • 2
  • 3
  • 4

proc.py的代码修改如下,使用 -u 参数:

from subprocess import * proc = Popen('python -u subproc.py', shell=True, stdout=PIPE, stderr=PIPE) while True: rcode = proc.poll() if rcode is None: print('from subprocess: ', end='') line = proc.stdout.readline().strip() print(line.decode()) 
        
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

原创文章,转载请注明出处:http://124.221.219.47/article/456o90/