# 多线程快速入门


本例中需要加载的包:

using Base.Threads
using BenchmarkTools

# 引言

异步快速入门中,我们提到了让需求资源不相关的任务同时发生的思想,对于 IO 和计算任务,这能提高硬件资源利用率并节省执行时间。 但仅靠异步本身并不能自动调用多个 CPU 核心——控制流仍是由单个 CPU 调度执行的。

调度更多的 CPU 资源的策略一般有两种:多线程或多进程。多线程是在同一 Julia 进程中创建多个线程,并通过共享内存的方式访问数据。 多进程则是创建多个 Julia 程序,然后通过进程间通信的方式交换数据。 线程和进程最大的区别是:进程间是相互独立的,但同一进程的各线程间会共享进程的资源。

从常见用法来看, Julia 对于多线程计算可以在两个层级上进行:

  • Threads.@threads 宏: 使用@threads自动将 for 循环根据可用线程数均匀划分。 在大多数简单场景下这就足够实现多线程的并行。
  • Threads.@spawn 宏: 将计算任务封装为独立的Task任务,然后在全部可用线程上进行调度。 这种方式可以实现更复杂的多线程并行机制。

本页将介绍 Julia 的这两种多线程使用。

除此之外,由于同一进程内的多个线程之间数据是共享的,在处理多线程算法的时候还需要处理数据竞争现象。 否则不加思考地引入多线程会导致程序出现不可预期的数值错误。本页也会简单介绍数据竞争的现象。

# Julia 多线程的使用

我们需要创建具有多线程的 Julia 进程。而线程数的设定必须在启动 Julia 之前进行,可以在命令行通过 julia -t 4 启动一个具有 4 个线程的 Julia 进程, 也可以在 Syslab 首选项中设置 Julia 进程的线程启动数。

首先需要加载 Base.Threads 模块才能使用多线程相关的函数或宏:

using Base.Threads

在多线程的 Julia 中可以使用nthreads查看线程数目:

nthreads()
4

对于高性能计算任务来说,计算资源的共享会在一定程度上增加上下文切换的开销,从而降低计算效率,因此一般来说数值计算中只使用不超过真实物理核心数目的线程数量。

即使以多线程方式启动,不加改动的 Julia 程序也依旧是默认运行在线程 ID 为 1 的主线程上,我们可以用 threadid 确认这点:

threadid()
1

threadid会返回执行当前语句的线程对应 ID。

# @threads 并行 for 循环

下图对应了 Julia 上的@threads编程模型:

任务均匀划分后对应分配给各线程。

@threads 修饰在 for 循环前会自动并行化 for 循环,但要求 for 循环的计算顺序是相互独立的:

@time @threads for i in 1:10
    sleep(1)
    @show i, threadid()
end
(i, threadid()) = (9, 4)
(i, threadid()) = (7, 3)
(i, threadid()) = (4, 2)
(i, threadid()) = (1, 1)
(i, threadid()) = (10, 4)
(i, threadid()) = (8, 3)
(i, threadid()) = (2, 1)
(i, threadid()) = (5, 2)
(i, threadid()) = (6, 2)
(i, threadid()) = (3, 1)
  3.095430 seconds (143.44 k allocations: 8.086 MiB, 2.04% compilation time)

我们可以看到任务均匀划分在单独线程上运行,而运行时间证明了循环语句各线程并行地执行。

这是一种简单地将串行循环并行化的方法,加上@threads的 for 循环会自动均匀划分给可用线程之间,用户不需要显式地拆分计算或者分配线程。

# Threads.@spawn 创建任务并运行在可用线程上

异步快速入门中介绍了Task任务对象,以及通过@async可以以异步的方式创建并执行一个任务。 在多线程中,Threads.@spawn也会类似地将计算创建为Task任务对象并立即执行。 与@async不同的是,Threads.@spawn创建并调度的任务会在任何可用线程上调度执行,而@async创建的任务只会在主线程上执行。

下图对应了 Julia 上的Threads.@spawn编程模型:

任务被创建然后由调度器分配给可用线程执行。

Threads.@spawn实现@threads的上例,@sync会等待全部执行后再退出,可以看到任务在多个线程上同时执行:

@time @sync for i in 1:10
    Threads.@spawn begin
        sleep(1)
        @show i, threadid()
    end
end
(i, threadid()) = (7, 4)
(i, threadid()) = (5, 3)
(i, threadid()) = (3, 2)
(i, threadid()) = (10, 4)
(i, threadid()) = (9, 1)
(i, threadid()) = (6, 1)
(i, threadid()) = (1, 1)
(i, threadid()) = (8, 2)
(i, threadid()) = (4, 3)
(i, threadid()) = (2, 2)
  1.026771 seconds (9.33 k allocations: 541.373 KiB, 2.05% compilation time)

任务迁移 (task migration)

Threads.@spawn创建的任务对象在运行过程中可能会发生任务迁移(在执行过程中调度到不同线程 ID 的线程上继续执行),故不建议在Threads.@spawn包装的任务中加入与threadid有关的计算依赖。

这可能会因任务迁移导致正确性问题。

如果需要计算中途不改变threadid,建议使用@threads,此时 for 循环内部语句在计算的中途并不会切换到不同的线程 ID 对应的线程上。

对于Task任务对象,我们不能假设它一直在唯一的线程 ID 对应线程上计算。

# 多 CPU 计算

前面讲解了在 Julia 中使用多线程的两种方式,现在来演示多线程调度了更多的计算资源来加速代码:

我们创建一个 10 次循环,每次循环生成一个 100x100 的随机矩阵,并计算其元素的平方和。

function calcu_serial()
    a = zeros(10)
    for i in 1:10
        tmp = zero(Float64)
        A = rand(100, 100)
        for x in eachindex(A)
            tmp += x^2
        end
        a[i] = tmp
    end
end
@btime calcu_serial();
  125.800 μs (21 allocations: 781.86 KiB)

使用@threads来加速这个计算:

function calcu_threads()
    a = zeros(10)
    @threads for i in 1:10
        tmp = 0.0
        A = rand(100, 100)
        for x in eachindex(A)
            tmp += x^2
        end
        a[i] = tmp
    end
end
@btime calcu_threads();
  48.300 μs (42 allocations: 784.31 KiB)

也可以使用Threads.@spawn

function calcu_spawn()
    a = zeros(10)
    @sync for i in 1:10
        Threads.@spawn begin
            tmp = 0.0
            A = rand(100, 100)
            for x in eachindex(A)
                tmp += x^2
            end
            a[i] = tmp
        end
    end
end
@btime calcu_spawn();
  52.000 μs (103 allocations: 789.06 KiB)

不恰当的 Threads.@spawn 使用会导致内存增长

如果Threads.@spawn使用不当,会创建过多的任务,导致过量的内存开销。

如果上例计算中有非常多循环,而循环体内部会生成更大矩阵,如以下计算:

# 循环次数由 1:10 -> 1:100000
for i in 1:100000
    a = zeros(10)
    for i in 1:10
        tmp = zero(Float64)
        # 矩阵size由100x100 -> 1000x1000
        for x in eachindex(rand(1000, 1000))
            tmp += x^2
        end
        a[i] = tmp
    end
end

如果我们继续使用Threads.@spawn为整个循环体计算创建任务,那么会同时创建 100000 个包含rand(1000,1000)矩阵生成的任务,这样很容易因内存占用过多导致程序崩溃卡退。

若需要限制任务数量,可以使用管道 Channel 来限制任务数量。 具体可以阅读异步生产者消费者示例来了解它的实际使用。

我们现在得到一个性能对比:

@btime 测试 时间
CPU 串行 125.800 μs
CPU @threads 4 线程计算 48.300 μs
CPU Threads.@spawn 4 线程计算 52.000 μs

这是在 4 线程下的计算性能,可以看到多线程相对于 CPU 串行的性能提升,这是因为多线程调动了更多的 CPU 资源来参与计算。

# 数据竞争问题

多线程模型中的内存数据是共享的,因此若多个线程同时对同一内存进行读写,就会出现数据竞争问题。 最简单的演示数据竞争现象的例子是对一个变量进行累加操作,我们创建一个累加函数:

function mysum_threads(A)
    rst = 0
    @threads for i in 1:length(A)
        rst += A[i]
    end
    return rst
end

mysum_threads(1:100_000)
1194143424

当我们使用多线程时,每次执行 mysum_threads(1:100_000) 得到的结果都会不一样。

关于数据竞争的进一步介绍以及它的处理方式,可以参见多线程数据竞争

# 总结

我们进行一个总结,在 Julia 下多线程使用和 API 关联概览如下图所示:

使用julia -t 4开启支持 4 个线程计算的 Julia REPL,可以使用nthreads()查询线程数,threadid()查询当前线程 ID。执行时使用@threads并行 for 循环或Threads.@spawn创建分配在不同线程上执行的任务。

最后,我们需要注意到,多线程计算并不是一个免费的计算,如果是很简单的计算,利用多核心计算带来的提升可能并不能平衡线程创建和调度分配的开销。

而且,多线程的共享内存特性虽然使得用户对不同线程间的协同不需要如多进程一般进行重量级的通信,但是多个线程对相同内存的同时读写会导致数据竞争问题。

对于存在数据竞争的计算情景,进一步的了解可以参见多线程数据竞争

用户需要根据计算和代码变更的开销,以及硬件资源情况,选择适合当前计算场景的方法。