Python 任务队列 & 多线程并发threading

浮生一境 2019年12月24日 445次浏览

Python 解释器由于设计时有GIL(Global Interpreter Lock)全局锁,导致了多线程无法利用多核。Python 实际运行是利用单核CPU。

由于存在GIL全局锁,导致python中多线程只是交替执行,在4核、8核CPU上,也仅仅能使用1核。

本文聊到的python多线程,是基于单核CPU实现线程交替执行,提高运算效率。

优点: 1、运算速度快 2、共享内存和变量,资源消耗少

threading 线程模块

threading 重点是Thread模块

import threading

目标函数可以实例化一个Thread对象,每一个Thread对象都代表一个线程,可以通过start()方法启动线程。

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Queue队列模块

队列分为几种:FIFO\LIFO\PRIORTY

FIFO 即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

LIFO 即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

PRIORTY 构造一个优先队列。maxsize用法同上。

基本方法: Queue.Queue(maxsize=0) FIFO, 如果maxsize小于1就表示队列长度无限 Queue.LifoQueue(maxsize=0) LIFO, 如果maxsize小于1就表示队列长度无限 Queue.qsize() 返回队列的大小 Queue.empty() 如果队列为空,返回True,反之False Queue.full() 如果队列满了,返回True,反之False Queue.get([block[, timeout]]) 读队列,timeout等待时间 Queue.put(item, [block[, timeout]]) 写队列,timeout等待时间 Queue.queue.clear() 清空队列

tast_done() 意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)

程序

以上介绍了多线程与队列,现在开始实现python多线程程序。

效果

执行结果

#!/usr/bin/python
# -*- coding: UTF-8 -*-


import time
import threading
from queue import Queue


class Worker(threading.Thread):
    def __init__(self, selfManager, name):
        threading.Thread.__init__(self)
        self.work_queue = selfManager.work_queue
        self.name = name
        self.start()


    def run(self):
        while True:
            try:
                if self.work_queue.empty():
                    # print("thread-%s: queue is empty, waiting for task." % self.name)
                    continue
                text = self.work_queue.get(block=True)
                # todo Anything
                print("%s thread-%s: doing task text: %s" % (str(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())), self.getName(), str(text)))

                time.sleep(10)
                self.work_queue.task_done()
            except Exception as e:
                print("thread-%s: task is error: %s" % (self.getName(), str(e)))
                break


class WorkManager:
    def __init__(self, thread_num):
        self.work_queue = Queue()  # 队列对象
        self.threads = []
        self._init_thread_pool(thread_num)


    def _init_thread_pool(self, thread_num):
        """初始化线程"""
        for name in range(thread_num):
            self.threads.append(Worker(self, str(name)))


    def add_job(self, job):
        """初始化工作队列"""
        self.work_queue.put(job)


if __name__ == '__main__':
    work_manager = WorkManager(5)
    for i in range(100):
        work_manager.add_job(i)