python 多进程和多线程配合

帮忙改下代码谢谢
import sys
reload(sys)
sys.setdefaultencoding('utf-8')

from multiprocessing import Process, Pool
from logindev import everysw
from logindev import mtsub

import time
import threadpool
import Queue
import datetime

a=[[("a","b","c"),("1","2","3"),("x","y","z")],[("11","22","33"),("44","55","66"),("77","88","99")]]
print len(a)

def prt(xxx):
print xxx,datetime.now()

def pt(ptpt):
pool=Pool(1)
re1=pool.map(mt,ptpt)
pool.close()
pool.join()

def mt(data):
pool=threadpool.ThreadPool(1)
re= threadpool.makeRequests(prt,data)
for x in re:
pool.putRequest(x)
time.sleep(1)
pool.wait()

pt(a)

root@webserver:/home/app/apsw# python test.py
2
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/threadpool.py", line 158, in run
result = request.callable(*request.args, **request.kwds)
TypeError: prt() argument after ** must be a mapping, not str

第1个回答  2017-12-11
由于python的多线程中存在PIL锁,因此python的多线程不能利用多核,那么,由于现在的计算机是多核的,就不能充分利用计算机的多核资源。但是python中的多进程是可以跑在不同的cpu上的。因此,尝试了多进程+多线程的方式,来做一个任务。比如:从中科大的镜像源中下载多个rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()

regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)

rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_list123456789101112131415161718192021
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.edu.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)

thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25

res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads: break1234567891011121314151617181920212223242526
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)123456789101112131415161718192021222324252627282930313233
def main():
url = 'https://mirrors.ustc.edu.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.edu.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.edu.cn/fedora/development/26/Server/x86_64/os/Packages/u/'

start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718

代码的功能主要是这样的:
main()方法中调用get_rpm_url_list(base_url)方法,获取要下载的每个rpm包的具体的url地址。其中base_url即中科大基础的镜像源的地址,比如:http://mirrors.ustc.edu.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,这个地址下有几十个rpm包,get_rpm_url_list方法将每个rpm包的url地址拼出来并返回。
multi_process(rpm_url_list)启动多进程方法,在该方法中,会调用多线程方法。该方法启动4个多进程,将上面方法得到的rpm包的url地址进行分组,分成4组,然后每一个组中的rpm包再最后由不同的线程去执行。从而达到了多进程+多线程的配合使用。
代码还有需要改进的地方,比如多进程启动的进程个数和rpm包的url地址分组是硬编码,这个还需要改进,毕竟,不同的机器,适合同时启动的进程个数是不同的。本回答被提问者采纳
相似回答