个性化阅读
专注于IT技术分析

Python多线程和多处理教程

本文概述

注意:根据普遍的要求, 我演示了一些替代技术-包括async / await, 仅在Python 3.5出现后才可用-在本文结尾处添加了一些更新。请享用!

批评Python的讨论经常谈论使用Python进行多线程工作有多么困难, 将矛头指向所谓的全局解释器锁(正式称为GIL), 该锁可防止Python代码的多个线程同时运行。因此, 如果你不是Python开发人员, 并且来自其他语言(例如C ++或Java), 则Python多线程模块的行为可能与你期望的不太一样。必须明确的是, 只要考虑到某些因素, 仍然可以用Python编写可同时运行或并行运行的代码, 并在最终性能上产生显着差异。如果你还没有阅读它, 我建议你在srcmini Engineering Blog上浏览Eqbal Quran关于Ruby中的并发和并行性的文章。

在此Python并发教程中, 我们将编写一个小的Python脚本来从Imgur下载最受欢迎的图像。我们将从一个顺序下载图像的版本开始, 或者一次下载一个。作为前提条件, 你将必须在Imgur上注册应用程序。如果你还没有Imgur帐户, 请先创建一个。

这些线程示例中的脚本已使用Python 3.6.4进行了测试。进行一些更改后, 它们也应与Python 2一起运行-urllib是这两个Python版本之间变化最大的地方。

Python多线程入门

让我们开始创建一个名为download.py的Python模块。该文件将包含获取图像列表并下载它们所需的所有功能。我们将这些功能分为三个独立的功能:

  • get_links
  • 下载链接
  • setup_download_dir

第三个功能setup_download_dir将用于创建下载目标目录(如果尚不存在)。

Imgur的API要求HTTP请求带有带有客户端ID的Authorization标头。你可以从在Imgur上注册的应用程序的仪表板中找到此客户端ID, 并且响应将进行JSON编码。我们可以使用Python的标准JSON库对其进行解码。下载图像是一个更简单的任务, 因为你要做的就是通过URL获取图像并将其写入文件。

脚本如下所示:

import json
import logging
import os
from pathlib import Path
from urllib.request import urlopen, Request

logger = logging.getLogger(__name__)

types = {'image/jpeg', 'image/png'}


def get_links(client_id):
    headers = {'Authorization': 'Client-ID {}'.format(client_id)}
    req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET')
    with urlopen(req) as resp:
        data = json.loads(resp.read().decode('utf-8'))
    return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types]


def download_link(directory, link):
    download_path = directory / os.path.basename(link)
    with urlopen(link) as image, download_path.open('wb') as f:
        f.write(image.read())
    logger.info('Downloaded %s', link)


def setup_download_dir():
    download_dir = Path('images')
    if not download_dir.exists():
        download_dir.mkdir()
    return download_dir

接下来, 我们将需要编写一个模块, 将使用这些功能来一步一步下载图像。我们将其命名为single.py。这将包含我们的第一个Imgur图片下载器的初始版本的主要功能。该模块将在环境变量IMGUR_CLIENT_ID中检索Imgur客户端ID。它将调用setup_download_dir来创建下载目标目录。最后, 它将使用get_links函数获取图像列表, 过滤掉所有GIF和相册URL, 然后使用download_link将每个图像下载并保存到磁盘。这是single.py的样子:

import logging
import os
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    for link in links:
        download_link(download_dir, link)
    logging.info('Took %s seconds', time() - ts)

if __name__ == '__main__':
    main()

在我的笔记本电脑上, 该脚本花费了19.4秒下载了91张图像。请注意, 这些数字可能会根据你所使用的网络而有所不同。 19.4秒还不是很长, 但是如果我们想下载更多图片怎么办?可能是900张图像, 而不是90张图像。平均每张照片0.2秒, 900张图像大约需要3分钟。对于9000张照片, 将需要30分钟。好消息是, 通过引入并发或并行性, 我们可以大大加快此过程。

所有后续代码示例将仅显示新的且特定于那些示例的import语句。为了方便起见, 所有这些Python脚本都可以在此GitHub存储库中找到。

Python中的并发性和并行性:线程示例

线程是实现Python并发性和并行性的最著名方法之一。线程是操作系统通常提供的功能。线程比进程轻, 并且共享相同的内存空间。

Python多线程内存模型

在此Python线程示例中, 我们将编写一个新模块来替换single.py。该模块将创建一个由八个线程组成的池, 从而使包括主线程在内的总共九个线程成为可能。我选择了八个工作线程, 因为我的计算机具有八个CPU内核, 每个内核一个工作线程对于一次运行多少线程来说似乎是一个不错的数字。实际上, 根据其他因素(例如同一台计算机上运行的其他应用程序和服务)会更仔细地选择此数字。

除了我们现在有一个新类DownloadWorker(它是Python Thread类的后代)外, 它与上一个几乎相同。 run方法已被覆盖, 它将运行无限循环。在每次迭代中, 它都会调用self.queue.get()尝试从线程安全队列中获取URL。它会阻塞, 直到队列中有一个要处理的项目为止。一旦工作人员从队列中接收到一项, 它便会调用上一个脚本中使用的相同的download_link方法, 以将图像下载到images目录。下载完成后, 工作程序会通知队列该任务已完成。这非常重要, 因为队列会跟踪已排队的任务数。如果工作人员没有发信号说他们完成了任务, 对queue.join()的调用将永远阻塞主线程。

import logging
import os
from queue import Queue
from threading import Thread
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


class DownloadWorker(Thread):

    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            directory, link = self.queue.get()
            try:
                download_link(directory, link)
            finally:
                self.queue.task_done()


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    # Create a queue to communicate with the worker threads
    queue = Queue()
    # Create 8 worker threads
    for x in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit even though the workers are blocking
        worker.daemon = True
        worker.start()
    # Put the tasks into the queue as a tuple
    for link in links:
        logger.info('Queueing {}'.format(link))
        queue.put((download_dir, link))
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    logging.info('Took %s', time() - ts)

if __name__ == '__main__':
    main()

在较早使用的同一台计算机上运行此Python线程示例脚本, 下载时间为4.1秒!这比上一个示例快了4.7倍。尽管这要快得多, 但是值得一提的是, 由于GIL, 整个过程中一次仅执行一个线程。因此, 此代码是并发的, 但不是并行的。仍然更快的原因是因为这是IO绑定的任务。在下载这些图像时, 处理器几乎不费吹灰之力, 并且大部分时间都花在等待网络上。这就是Python多线程可以大大提高速度的原因。只要其中一个线程准备执行某些工作, 处理器就可以在线程之间切换。在Python或任何其他解释语言中使用带有GIL的线程模块实际上会导致性能降低。如果你的代码正在执行CPU限制的任务(例如, 解压缩gzip文件), 则使用线程模块将导致执行时间变慢。对于CPU约束的任务和真正的并行执行, 我们可以使用多处理模块。

尽管事实上的参考Python实现CPython具有GIL, 但并非所有Python实现都如此。例如, IronPython(使用.NET框架的Python实现)没有GIL, 而基于Java的Jython也没有。你可以在此处找到可用的Python实现列表。

相关:srcmini开发人员的Python最佳实践和技巧

Python中的并发性和并行性示例2:生成多个进程

多处理模块比线程模块更容易插入, 因为我们不需要添加Python线程示例之类的类。我们需要进行的唯一更改是在main函数中。

Python多处理教程:模块

要使用多个进程, 我们创建一个多处理池。使用它提供的map方法, 我们会将URL列表传递给池, 池将依次产生八个新进程, 并使用每个进程并行下载图像。这是真正的并行性, 但要付出代价。脚本的整个内存将复制到产生的每个子进程中。在这个简单的示例中, 这没什么大不了的, 但是对于不平凡的程序来说, 它很容易成为严重的开销。

import logging
import os
from functools import partial
from multiprocessing.pool import Pool
from time import time

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    ts = time()
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    download = partial(download_link, download_dir)
    with Pool(4) as p:
        p.map(download, links)
    logging.info('Took %s seconds', time() - ts)


if __name__ == '__main__':
    main()

Python中的并发性和并行性示例3:分配给多个工作程序

尽管线程和多处理模块非常适合在你的个人计算机上运行的脚本, 但是如果你希望在另一台计算机上完成工作, 或者需要扩展到一台计算机上的CPU以外的最大容量, 该怎么办?处理?一个很好的用例是长时间运行的Web应用程序后端任务。如果你有一些长期运行的任务, 则不想在同一台计算机上启动一堆需要运行其余应用程序代码的子流程或线程。这将降低所有用户的应用程序性能。最好的是能够在另一台计算机或许多其他计算机上运行这些作业。

RQ是一个出色的Python库, 它是一个非常简单但功能强大的库。首先, 使用库将函数及其参数加入队列。这使函数调用表示形式变得很烂, 然后附加到Redis列表中。使工作入队是第一步, 但目前仍无能为力。我们还需要至少一名工人来收听该工作队列。

RQ Python队列库模型

第一步是在计算机上安装和运行Redis服务器, 或者访问正在运行的Redis服务器。在那之后, 对现有代码仅进行了一些小的更改。我们首先创建RQ队列的实例, 然后从redis-py库中将其传递给Redis服务器的实例。然后, 我们不只是调用我们的download_link方法, 而是调用q.enqueue(download_link, download_dir, link)。 enqueue方法将一个函数作为其第一个参数, 然后在实际执行作业时将所有其他参数或关键字参数传递给该函数。

我们需要做的最后一步是启动一些工人。 RQ提供了一个方便的脚本来在默认队列上运行工作程序。只需在终端窗口中运行rqworker, 它将启动工作线程在默认队列上侦听。请确保你当前的工作目录与脚本所在的目录相同。如果要侦听其他队列, 可以运行rqworker queue_name, 它将侦听该命名队列。 RQ的伟大之处在于, 只要你可以连接到Redis, 就可以在任意数量的不同机器上运行任意数量的工作程序。因此, 随着应用程序的增长, 扩展非常容易。这是RQ版本的来源:

import logging
import os

from redis import Redis

from rq import Queue

from download import setup_download_dir, get_links, download_link


logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logging.getLogger('requests').setLevel(logging.CRITICAL)
logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)
    q = Queue(connection=Redis(host='localhost', port=6379))
    for link in links:
        q.enqueue(download_link, download_dir, link)

if __name__ == '__main__':
    main()

但是, RQ不是唯一的Python作业队列解决方案。 RQ易于使用, 并且很好地涵盖了简单的用例, 但是如果需要更多高级选项, 则可以使用其他Python 3队列解决方案(例如Celery)。

Python多线程与多处理

如果你的代码受IO限制, 则Python中的多处理和多线程都将为你工作。多处理比线程更容易插入, 但具有更高的内存开销。如果你的代码受CPU限制, 则多处理最有可能是更好的选择-尤其是在目标计算机具有多个内核或CPU的情况下。对于Web应用程序, 当你需要在多台计算机上扩展工作时, RQ将对你更好。

相关:变得更高级:避免Python程序员犯的10个最常见的错误


更新

Python并发功能

自从Python 3.2以来, 原始文章中没有涉及的新内容是current.futures软件包。该软件包提供了另一种在Python中使用并发和并行性的方法。

在原始文章中, 我提到Python的多处理模块比线程模块更容易放入现有代码中。这是因为Python 3线程模块需要子类化Thread类, 并且还需要为线程创建一个Queue来监视工作。

使用current.futures.ThreadPoolExecutor使Python线程示例代码几乎与多处理模块相同。

import logging
import os
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from time import time

from download import setup_download_dir, get_links, download_link

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    links = get_links(client_id)

    # By placing the executor inside a with block, the executors shutdown method
    # will be called cleaning up threads.
    # 
    # By default, the executor sets number of workers to 5 times the number of
    # CPUs.
    with ThreadPoolExecutor() as executor:

        # Create a new partially applied function that stores the directory
        # argument.
        # 
        # This allows the download_link function that normally takes two
        # arguments to work with the map function that expects a function of a
        # single argument.
        fn = partial(download_link, download_dir)

        # Executes fn concurrently using threads on the links iterable. The
        # timeout is for the entire process, not a single call, so downloading
        # all images must complete within 30 seconds.
        executor.map(fn, links, timeout=30)


if __name__ == '__main__':
    main()

现在, 我们已经使用Python ThreadPoolExecutor下载了所有这些图像, 我们可以使用它们来测试CPU绑定的任务。我们可以在单线程, 单进程脚本中创建所有图像的缩略图版本, 然后测试基于多处理的解决方案。

我们将使用Pillow库处理图像的大小调整。

这是我们的初始脚本。

import logging
from pathlib import Path
from time import time

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension.  E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    image = Image.open(path)
    image.thumbnail(size)
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image.save(thumbnail_path)


def main():
    ts = time()
    for image_path in Path('images').iterdir():
        create_thumbnail((128, 128), image_path)
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

该脚本遍历images文件夹中的路径, 并针对每个路径运行create_thumbnail函数。此功能使用”枕头”打开图像, 创建缩略图并保存新的较小图像, 该图像的名称与原始名称相同, 但名称后面附加了_thumbnail。

在160张图片(共3600万张)上运行此脚本需要2.32秒。让我们看看是否可以使用ProcessPoolExecutor加快速度。

import logging
from pathlib import Path
from time import time
from functools import partial

from concurrent.futures import ProcessPoolExecutor

from PIL import Image

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

logger = logging.getLogger(__name__)


def create_thumbnail(size, path):
    """
    Creates a thumbnail of an image with the same name as image but with
    _thumbnail appended before the extension. E.g.:

    >>> create_thumbnail((128, 128), 'image.jpg')

    A new thumbnail image is created with the name image_thumbnail.jpg

    :param size: A tuple of the width and height of the image
    :param path: The path to the image file
    :return: None
    """
    path = Path(path)
    name = path.stem + '_thumbnail' + path.suffix
    thumbnail_path = path.with_name(name)
    image = Image.open(path)
    image.thumbnail(size)
    image.save(thumbnail_path)


def main():
    ts = time()
    # Partially apply the create_thumbnail method, setting the size to 128x128
    # and returning a function of a single argument.
    thumbnail_128 = partial(create_thumbnail, (128, 128))

    # Create the executor in a with block so shutdown is called when the block
    # is exited.
    with ProcessPoolExecutor() as executor:
        executor.map(thumbnail_128, Path('images').iterdir())
    logging.info('Took %s', time() - ts)


if __name__ == '__main__':
    main()

create_thumbnail方法与上一个脚本相同。主要区别是创建ProcessPoolExecutor。执行者的map方法用于并行创建缩略图。默认情况下, ProcessPoolExecutor每个CPU创建一个子进程。在相同的160张图像上运行此脚本需要1.05秒的时间-快了2.2倍!

异步/等待(仅适用于Python 3.5+)

原始文章评论中要求最多的一项内容是使用Python 3的asyncio模块的示例。与其他示例相比, 对于大多数人来说, 有些新的Python语法可能是新的, 同时也有些新概念。不幸的是, 额外复杂的一层是由于Python的内置urllib模块不是异步的。我们将需要使用异步HTTP库来获得asyncio的全部好处。为此, 我们将使用aiohttp。

让我们直接进入代码, 然后将进行更详细的说明。

import asyncio
import logging
import os
from time import time

import aiohttp

from download import setup_download_dir, get_links

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


async def async_download_link(session, directory, link):
    """
    Async version of the download_link method we've been using in the other examples.
    :param session: aiohttp ClientSession
    :param directory: directory to save downloads
    :param link: the url of the link to download
    :return:
    """
    download_path = directory / os.path.basename(link)
    async with session.get(link) as response:
        with download_path.open('wb') as f:
            while True:
                # await pauses execution until the 1024 (or less) bytes are read from the stream
                chunk = await response.content.read(1024)
                if not chunk:
                    # We are done reading the file, break out of the while loop
                    break
                f.write(chunk)
    logger.info('Downloaded %s', link)


# Main is now a coroutine
async def main():
    client_id = os.getenv('IMGUR_CLIENT_ID')
    if not client_id:
        raise Exception("Couldn't find IMGUR_CLIENT_ID environment variable!")
    download_dir = setup_download_dir()
    # We use a session to take advantage of tcp keep-alive
    # Set a 3 second read and connect timeout. Default is 5 minutes
    async with aiohttp.ClientSession(conn_timeout=3, read_timeout=3) as session:
        tasks = [(async_download_link(session, download_dir, l)) for l in get_links(client_id)]
        # gather aggregates all the tasks and schedules them in the event loop
        await asyncio.gather(*tasks, return_exceptions=True)


if __name__ == '__main__':
    ts = time()
    # Create the asyncio event loop
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        # Shutdown the loop even if there is an exception
        loop.close()
    logger.info('Took %s seconds to complete', time() - ts)

这里有很多要解压的东西。让我们从程序的主要入口开始。我们使用asyncio模块做的第一件事是获取事件循环。事件循环处理所有异步代码。然后, 循环运行直到完成并传递了main函数。 main的定义中有一个新语法:async def。你还会注意到等待和异步。

PEP492中引入了async / await语法。异步def语法将一个函数标记为协程。在内部, 协程基于Python生成器, 但并不完全相同。协程返回一个协程对象, 类似于生成器返回生成器对象的方式。一旦有了协程, 就可以使用await表达式获得其结果。当协程调用await时, 协程的执行将被挂起, 直到awaitable完成。这种暂停使协程暂停”等待”结果时可以完成其他工作。通常, 此结果将是某种I / O, 例如数据库请求或本例中的HTTP请求。

download_link函数必须进行相当大的更改。以前, 我们依靠urllib来完成为我们读取图像的工作。现在, 为了使我们的方法能够正确地与异步编程范例一起使用, 我们引入了while循环, 一次读取图像的一部分, 并在等待I / O完成之前暂停执行。这使得事件循环可以循环下载不同的图像, 因为每个图像在下载过程中都有可用的新数据。

应该有一种-最好只有一种-显而易见的方法

虽然禅宗的Python告诉我们应该有一种显而易见的方法来做某事, 但是Python中有很多方法可以将并发引入程序中。最好的选择方法将取决于你的特定用例。与线程或多处理相比, 异步范式可以更好地扩展到高并发工作负载(例如Web服务器), 但是它要求你的代码(和依赖项)异步才能完全受益。

希望本文(以及更新)中的Python线程示例将为你指明正确的方向, 这样, 如果你需要在程序中引入并发性, 就可以了解在Python标准库中的位置。

赞(0)
未经允许不得转载:srcmini » Python多线程和多处理教程

评论 抢沙发

评论前必须登录!