编程学习网 > 编程语言 > Python > 真正的Python多线程教程来了
2023
09-12

真正的Python多线程教程来了

32岁的Python依然没有真正的并行性/并发性。然而,这种情况即将发生改变,因为在即将发布的Python 3.12中引入了名为"Per-Interpreter GIL"的新特性。在距离发布还有几个月的时间(预计2023年10月发布),但相关代码已经有了,因此,我们可以提前了解如何使用子解释器API编写真正的并发Python代码。


子解释器

首先,让我们来解释一下如何通过“Per-Interpreter GIL”来解决Python缺乏适当的并发性问题。

在Python中,GIL是一个互斥锁,它只允许一个线程控制Python解释器。这意味着即使在Python中创建多个线程(例如使用线程模块),也只有一个线程会运行。

随着“Per-Interpreter GIL”的引入,各个Python解释器不再共享同一个GIL。这种隔离级别允许每个子解释器可以同时运行。这意味着我们可以通过生成额外的子解释器来绕过Python的并发限制,其中每个子解释器都有自己的GIL(全局状态)。

更详细的说明请参见PEP 684,该文档描述了此功能/更改:https://peps.python.org/pep-0684/#per-interpreter-state

上手体验

安装

为使用这项最新功能,我们必须安装最新版的Python,并且需要从源码上进行构建:

# https://devguide.python.org/getting-started/setup-building/#unix-compiling
git clone https://github.com/python/cpython.git
cd cpython

./configure --enable-optimizations --prefix=$(pwd)/python-3.12
make -s -j2
./python
# Python 3.12.0a7+ (heads/main:22f3425c3d, May 10 2023, 12:52:07) [GCC 11.3.0] on linux
# Type "help", "copyright", "credits" or "license" for more information.

C-API在哪里?

既然已经安装了最新的版本,那我们该如何使用子解释器呢?可以直接导入吗?不,正如PEP-684中所提到的:“这是一个高级功能,专为C-API的一小部分用户设计。”

目前,Per-Interpreter GIL特性只能通过C-API使用,因此Python开发人员没有直接的接口可以使用。这样的接口预计将随着PEP 554一起推出,如果被采纳,则应该会在Python 3.13中实现,在那之前,我们必须自己想办法实现子解释器。

通过CPython代码库中的一些零散记录,我们可以采用下面两种方法:

使用_xxsubinterpreters模块,该模块是用C实现的,因此名称看起来有些奇怪。由于它是用C实现的,所以开发者无法轻易地检查代码(至少不是在 Python 中);

或者可以利用CPython的测试模块,该模块具有用于测试的示例 Interpreter(和 Channel)类。

# Choose one of these:
import _xxsubinterpreters as interpreters
from test.support import interpreters
在接下来的演示中,我们将主要采用第二种方法。我们已经找到了子解释器,但还需要从Python的测试模块中借用一些辅助函数,以便将代码传递给子解释器:

from textwrap import dedent
import os
# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L75
def _captured_script(script):
    r, w = os.pipe()
    indented = script.replace('\n', '\n                ')
    wrapped = dedent(f"""
        import contextlib
        with open({w}, 'w', encoding="utf-8") as spipe:
            with contextlib.redirect_stdout(spipe):
                {indented}
        """)
    return wrapped, open(r, encoding="utf-8")


def _run_output(interp, request, channels=None):
    script, rpipe = _captured_script(request)
    with rpipe:
        interp.run(script, channels=channels)
        return rpipe.read()

将interpreters模块与上述的辅助程序组合在一起,便可以生成第一个子解释器:

from test.support import interpreters

main = interpreters.get_main()
print(f"Main interpreter ID: {main}")
# Main interpreter ID: Interpreter(id=0, isolated=None)

interp = interpreters.create()

print(f"Sub-interpreter: {interp}")
# Sub-interpreter: Interpreter(id=1, isolated=True)

# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L236
code = dedent("""
            from test.support import interpreters
            cur = interpreters.get_current()
            print(cur.id)
            """)

out = _run_output(interp, code)

print(f"All Interpreters: {interpreters.list_all()}")
# All Interpreters: [Interpreter(id=0, isolated=None), Interpreter(id=1, isolated=None)]
print(f"Output: {out}")  # Result of 'print(cur.id)'
# Output: 1
生成和运行新解释器的一个方法是使用create函数,然后将解释器与要执行的代码一起传递给_run_output辅助函数。

更简单点的方法是:

interp = interpreters.create()
interp.run(code)
使用解释器中的run方法。可是,如果我们运行上述代码中的任意一个,都会得到如下错误:

Fatal Python error: PyInterpreterState_Delete: remaining subinterpreters
Python runtime state: finalizing (tstate=0x000055b5926bf398)
为避免此类错误发生,还需要清理一些悬挂的解释器:

def cleanup_interpreters():
    for i in interpreters.list_all():
        if i.id == 0:  # main
            continue
        try:
            print(f"Cleaning up interpreter: {i}")
            i.close()
        except RuntimeError:
            pass  # already destroyed

cleanup_interpreters()
# Cleaning up interpreter: Interpreter(id=1, isolated=None)
# Cleaning up interpreter: Interpreter(id=2, isolated=None)
线程

虽然使用上述辅助函数运行代码是可行的,但使用线程模块中熟悉的接口可能更加方便:

import threading

def run_in_thread():
    t = threading.Thread(target=interpreters.create)
    print(t)
    t.start()
    print(t)
    t.join()
    print(t)

run_in_thread()
run_in_thread()

# <Thread(Thread-1 (create), initial)>
# <Thread(Thread-1 (create), started 139772371633728)>
# <Thread(Thread-1 (create), stopped 139772371633728)>
# <Thread(Thread-2 (create), initial)>
# <Thread(Thread-2 (create), started 139772371633728)>
# <Thread(Thread-2 (create), stopped 139772371633728)>
我们通过把interpreters.create函数传递给Thread,它会自动在线程内部生成新的子解释器。

我们也可以结合这两种方法,并将辅助函数传递给threading.Thread:

import time

def run_in_thread():
    interp = interpreters.create(isolated=True)
    t = threading.Thread(target=_run_output, args=(interp, dedent("""
            import _xxsubinterpreters as _interpreters
            cur = _interpreters.get_current()

            import time
            time.sleep(2)
            # Can't print from here, won't bubble-up to main interpreter

            assert isinstance(cur, _interpreters.InterpreterID)
            """)))
    print(f"Created Thread: {t}")
    t.start()
    return t


t1 = run_in_thread()
print(f"First running Thread: {t1}")
t2 = run_in_thread()
print(f"Second running Thread: {t2}")
time.sleep(4)  # Need to sleep to give Threads time to complete
cleanup_interpreters()
这里,我们演示了如何使用_xxsubinterpreters模块而不是test.support中的模块。我们还在每个线程中睡眠2秒钟来模拟一些“工作”。请注意,我们甚至不必调用join()函数等待线程完成,只需在线程完成时清理解释器即可。

Channels

如果我们深入研究CPython测试模块,我们还会发现有 RecvChannel 和 SendChannel 类的实现,它们类似于 Golang 中的通道(Channel)。要使用它们:

# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test_interpreters.py#L583
r, s = interpreters.create_channel()

print(f"Channel: {r}, {s}")
# Channel: RecvChannel(id=0), SendChannel(id=0)

orig = b'spam'
s.send_nowait(orig)
obj = r.recv()
print(f"Received: {obj}")
# Received: b'spam'

cleanup_interpreters()
# Need clean up, otherwise:

# free(): invalid pointer
# Aborted (core dumped)
这个例子展示了如何创建一个带有receiver(r)和sender(s)端的通道。我们可以使用send_nowait将数据传递给发送方,并使用recv函数在另一侧读取它。这个通道实际上只是另一个子解释器 - 所以与之前一样 - 我们需要在完成后进行清理。

深入挖掘

最后,如果我们想要干扰或调整在C代码中设置的子解释器选项,那么可以使用test.support模块中的代码,具体来说就是run_in_subinterp_with_config:

import test.support

def run_in_thread(script):
    test.support.run_in_subinterp_with_config(
        script,
        use_main_obmalloc=True,
        allow_fork=True,
        allow_exec=True,
        allow_threads=True,
        allow_daemon_threads=False,
        check_multi_interp_extensions=False,
        own_gil=True,
    )

code = dedent(f"""
            from test.support import interpreters
            cur = interpreters.get_current()
            print(cur)
            """)

run_in_thread(code)
# Interpreter(id=7, isolated=None)
run_in_thread(code)
# Interpreter(id=8, isolated=None)
这个函数是一个Python API,用于调用C函数。它提供了一些子解释器选项,如own_gil,指定子解释器是否应该拥有自己的GIL。

总结

话虽如此——也正如你所看到的,API调用并不简单,除非你已具备C语言专业知识,并且又迫切想要使用字解释器,否则建议还是等待Python 3.13的发布。或者您可以尝试extrainterpreters项目,该项目提供更友好的Python API以便使用子解释器。

以上就是真正的Python多线程教程来了的详细内容,想要了解更多Python教程欢迎持续关注编程学习网。

扫码二维码 获取免费视频学习资料

Python编程学习

查 看2022高级编程视频教程免费获取