# 多线程代码生成


代码生成工具支持使用 @threads 宏生成的多线程代码

# Julia @threads 宏介绍

# 概述

Julia 语言的@threads宏是 Julia 多线程并行计算的核心工具之一,通过它用户可以将循环中的任务均匀分配到多个线程上并行执行。

# 基本用法

在启动时,Julia 会根据用户设置的线程数量启动线程池,@threads 宏的基本工作原理是将循环中的迭代任务自动分配给多个线程执行。当执行带有 @threads 宏的循环时, Julia 会将每个线程分配给一部分的循环迭代操作,以实现任务的并行化。本文将简要 @threads 的用法,关于更多的 Julia 多线程编程的内容,请查看 Syslab 帮助文档的多线程快速入门章节。

# 设置线程数

由于 Julia 是在启动前预先分配线程,必须确保 Julia 在启动时设置了线程数量(若没有手动设置,则将默认包含 1 个线程)。Julia 使用环境变量 JULIA_NUM_THREADS 或启动参数 -t 来指定线程数。 例如,若用户需要启用 4 个线程,我们可以通过如下方式在 Syslab 终端中启动 Julia:

julia -t 4

也可以在终端中设置环境变量:

#Windows命令
$env:JULIA_NUM_THREADS=4
#linux命令
export JULIA_NUM_THREADS=4

# 运行代码

假设我们要对两个向量ab进行加法 并将结果存储在c中,使用@threads宏,我们可以将向量的元素加法分配给多个线程来并行执行。

新建parallel_add.jl文件。

using Base.Threads

function main()
    # 定义向量大小和线程的工作内容
    N = 10^2
    a = 1:N
    b = 1:N
    c = zeros(N)

    # 使用 @threads 进行并行向量加法,并打印线程ID
    Threads.@threads for i in 1:N
        c[i] = a[i] + b[i]
        if i % (N / 10) == 0  # 每计算 10% 的数据时输出一次
            thread_id = threadid()
            msg = "线程ID:$thread_id 处理第 $i 个元素: $(c[i])= $(a[i]) + $(b[i])"
            println(msg)
        end
    end

    println("加法操作完成!")
end

@isdefined(SyslabCC) || main()

在终端中输入以下命令来通过多线程模式运行 Julia 代码:

julia -t 8 parallel_add.jl
 #设置了JULIA_NUM_THREADS 后可以直接使用 julia parallel_add.jl

输出结果:

线程ID:7 处理第 70 个元素: 140.0= 70 + 70
线程ID:6 处理第 60 个元素: 120.0= 60 + 60
线程ID:1 处理第 10 个元素: 20.0= 10 + 10
线程ID:4 处理第 30 个元素: 60.0= 30 + 30
线程ID:5 处理第 40 个元素: 80.0= 40 + 40
线程ID:8 处理第 90 个元素: 180.0= 90 + 90
线程ID:2 处理第 80 个元素: 160.0= 80 + 80
线程ID:3 处理第 20 个元素: 40.0= 20 + 20
线程ID:5 处理第 50 个元素: 100.0= 50 + 50
线程ID:8 处理第 100 个元素: 200.0= 100 + 100
加法操作完成!

由于线程调度的原因,打印出来的顺序可能不同。

提示

  • 由于@threads运行时,每个线程的迭代次数是均匀分配的,因此使用@threads时须保证 for 循环每次迭代计算耗时接近,否则将造成任务负载不均匀,影响效率;
  • @threads适合计算密集型任务,不适合 IO 密集型并发任务。

# 代码生成中使用 @thread 宏生成多线程代码

用户可以通过编译参数 --threadpool-size 配置预先分配线程数量,可以根据自己的处理器核心数量选择分配的线程数,默认为 4 个线程。 在 Syslab 终端使用如下命令编译 parallel_add.jl 生成多线程应用程序。

scc parallel_add.jl -o parallel_add_app --threadpool-size 8

运行 app:

 .\parallel_add_app.exe
 # Linux 命令:
 # ./parallel_add_app

运行结果:

线程ID:3 处理第 60 个元素: 120= 60 + 60
线程ID:8 处理第 80 个元素: 160= 80 + 80
线程ID:2 处理第 70 个元素: 140= 70 + 70
线程ID:4 处理第 40 个元素: 80= 40 + 40
线程ID:6 处理第 20 个元素: 40= 20 + 20
线程ID:5 处理第 30 个元素: 60= 30 + 30
线程ID:7 处理第 90 个元素: 180= 90 + 90
线程ID:1 处理第 10 个元素: 20= 10 + 10
线程ID:4 处理第 50 个元素: 100= 50 + 50
线程ID:7 处理第 100 个元素: 200= 100 + 100
加法操作完成!

# 多线程代码生成的局限性

注意

如果 Julia 代码依赖信号量、锁等线程同步功能,则暂时不支持代码生成。 目前,多线程代码生成要求开发者手动避免数据竞争,或者通过使用 Atomic{T} 类型的原子变量来确保线程安全。

# 手动管理数据竞争

如下例子,如果多个线程同时访问变量 sum 进行加法操作,可能会造成数据竞争,使得结果不正确。 新建 race_add.jl 文件,内容如下:

# race_add.jl
using Base.Threads
function main()
    sum::Int64 = 0
    @threads for i in 1:1_000_000
        sum += 1 #不安全的操作
    end
    println(sum) ## 结果是未定义的
end
@isdefined(SyslabCC) || main()

编译并运行:

scc race_add.jl -o race_add --threadpool-size 8
.\race_add.exe
 # Linux 命令:
 # ./race_add

多次运行的结果:

276771
307084
223764
263706
298106
341495
266828

多次运行 race_add 发现结果有可能小于1000000。这是由于多个线程同时访问了 sum ,产生了数据竞争,导致有些递增操作可能被覆盖或丢失。

多个线程同时修改共享变量是非常危险的行为,会导致不确定的结果。在暂时不支持锁的情况下可以用合理的数据分块来解决数据竞争,比如为每个执行线程创建自己独占的缓存,而不是直接写入共享变量。 新建 secure_add.jl 文件, 内容如下:

# secure_add.jl
using Base.Threads
function main()
    ## 最终计算结果
    sum::Int64 = 0
    ## 获取线程池中线程数量
    N = Base.Threads.threadpoolsize()
    ## 为每个线程创建缓存 用于存放自己线程的计算结果
    sum_cache = zeros(Int64, N)
    @threads for i in 1:1_000_000
        ## 获取当前线程id
        thread_id = Base.Threads.threadid()
        ## 计算结果存在当前线程
        sum_cache[thread_id] += 1
    end
    ##在主线程中 处理每个线程缓存的计算结果
    for i in 1:N
        sum += sum_cache[i]
    end
    println(sum) ## 结果是安全的
end
@isdefined(SyslabCC) || main()

编译并运行:

scc secure_add.jl -o secure_add --threadpool-size 8
.\secure_add.exe
 # Linux 命令:
 # ./secure_add

计算结果:

1000000
1000000
1000000
...

这种情况下多次运行的计算结果均正确,因为每个线程只访问了自己独占的缓存,计算完成后由主线程汇集计算结果。

# 使用原子操作

Julia 提供了原子操作,用于在多线程环境下安全地修改共享变量。原子操作是一种特殊的操作,它保证在任何时刻只有一个线程可以执行该操作,且在操作执行完成之前不会被其他线程打断,从而避免了数据竞争。

目前 Syslab 支持使用 Atomic{T} 类型的原子变量的如下函数来实现原子操作。

函数名 功能
atomic_add!(x::Atomic{T}, y::T) 原子地将 y 加到 x 上,并返回原本 x 的值。
atomic_sub!(x::Atomic{T}, y::T) 原子地将 yx 中减去,并返回原本 x 的值。
atomic_cas!(x::Atomic{T}, old::T, new::T) 原子地将 x 的值从 old 替换为 new ,并返回原本 x 的值。
atomic_xchg!(x::Atomic{T}, new::T) 原子地将 x 的值替换为 new,并返回原本 x 的值。
x[] 原子地读取 x 的值,并返回。
x[] = value 原子地将 x 的值设置为 value
atomic_and!(x::Atomic{T}, y::T) 原子地将 xy 进行按位与操作,并返回原本 x 的值。
atomic_or!(x::Atomic{T}, y::T) 原子地将 xy 进行按位或操作,并返回原本 x 的值。
atomic_xor!(x::Atomic{T}, y::T) 原子地将 xy 进行按位异或操作,并返回原本 x 的值。
atomic_nand(x::Atomic{T}, y::T) 原子地将 xy 进行按位与操作,并返回原本 x 的值。
atomic_nor!(x::Atomic{T}, y::T) 原子地将 xy 进行按位或操作,并返回原本 x 的值。
atomic_min!(x::Atomic{T}, y::T) 原子地将 xy 中的较小值设置为 x,并返回原本 x 的值。
atomic_max!(x::Atomic{T}, y::T) 原子地将 xy 中的较大值设置为 x,并返回原本 x 的值。

注意

  • 原子操作仅适用于基本数据类型,如 Int32Float32Bool 等;
  • atomic_add!atomic_sub! 函数,不支持浮点类型原子变量;
  • 使用 win32 线程模式时,只支持 8 字节和 16 字节类型的 atomic_cas! 和基本的读取和写入。

# 示例

新建 atm_add.jl 文件, 内容如下:

using Base.Threads
function main()
    sum::Atomic{Int64} = Atomic{Int64}(0)
    @threads for i in 1:1_000_000
        atomic_add!(sum,1); ## 使用 atomic_add! 保证加法操作的线程安全
    end
    println(sum[]) ## 使用 sum[] load原子变量的值
end
@isdefined(SyslabCC) || main()

编译并运行:

scc atm_add.jl -o atm_add.exe --threadpool-size 8
.\atm_add.exe
# Linux 命令:
# scc atm_add.jl -o atm_add --threadpool-size 8
#./atm_add

运行结果:

1000000