# 异步快速入门
# 为什么需要异步?
普通的函数大多是串行执行的,但是现代计算机其实可以同时做很多事情,例如硬盘读写,网络请求和 CPU 计算。这些操作背后使用的是独立的硬件资源,例如:发送网络请求后等待回应的这段时间并不妨碍 CPU 进行其他计算任务。除此之外,随着多核计算机已经普及,我们也可以使用多个 CPU 核心来同时进行计算。在这个背景下,开发者需要关心如何更高效率地利用硬件资源。
存在很多更高效利用硬件资源的方式,如:多线程、分布式以及异构计算。在本文中,将介绍的是利用异步的方式来优化任务的执行与调度。
对于两个相互独立的任务来说,我们很容易会产生一个想法:如果两个操作使用的是不同的硬件资源,那么它们完全可以同时执行。当我们尝试做这样一件事的时候,我们实际上就是在做异步和并发编程。下图就是对此想法的直观描述:
在这张图中,我们可以观察到以下两个现象:
- 每个任务的执行顺序是不确定的——这是由任务调度决定的
- CPU 的执行时间和 IO 的执行时间存在重叠,从而减少总执行时间
更重要的是,由于 CPU 任务的总计算时间并没有发生变化,我们可以不需要引入更多的计算资源就可以达到更好的性能。这一点和多线程以及多进程是不一样的——异步上的时间优化来源于 CPU 和 IO 任务的调度优化,而多线程或多进程则是引入了更多的计算资源。
接下来,我们将介绍 Julia 异步机制的核心概念:任务。
# 任务
在 Julia 中,任何一个无参函数都可以被封装为一个任务。无参函数指的是可以以f() 形式被调用的函数或对象。
下面,我们将介绍两种典型的任务:CPU 密集型任务与 IO 密集型任务。进一步地,我们将介绍如何使用异步的方式来调度这些任务。
# CPU 密集型任务
function busywait(seconds=5)
# 该函数本身没有实际价值
# 在这里仅仅是构造一个一直占用 CPU 的定时函数,用于演示异步任务的调度
tstart = time_ns()
while (time_ns() - tstart) / 1e9 < seconds
continue
end
return (time_ns() - tstart) / 1e9
end
busywait (generic function with 2 methods)
创建任务使用Task:它接受一个无参函数作为输入(Julia 中函数本身也是对象)。
t = Task(busywait)
Task (runnable) @0x0000024503d48f90
刚被创建的任务对象的状态是runnable,表示该任务可以被执行。这一般是任务刚创建还未开始执行,或者执行到一半被暂停了。
为了让任务开始执行,我们需要调用schedule函数。
schedule会将Task任务加入由 Julia 调度器维护的任务队列,调度器会对队列中的任务做暂停、切换、恢复的异步调度。
schedule(t) # 停顿5秒钟
Task (done) @0x0000024503d48f90
在这里会观察到一个五秒左右的停顿,这是因为它占用了 CPU 五秒钟的时间 —— 不断地重复 busywait 内的 while 循环。
在这五秒钟时间内,Julia 的命令行交互由于被阻塞而无响应。
schedule 会立即返回
值得注意的是, 尽管直接运行schedule可能会得到一个停顿延迟,但schedule本身其实是立即返回的。这一点可以很容易进行验证:
@time schedule(@task busywait(5))
0.000031 seconds (26 allocations: 1.969 KiB)
Task (done) @0x000001df25fa4200
如果需要测量完整的执行时间,则需要使用 wait 函数来等待任务中止:
@time wait(schedule(@task busywait(5)))
5.003431 seconds (15.91 k allocations: 932.914 KiB, 0.07% compilation time)
# IO 密集型任务
busywait是一个典型的 CPU 密集型任务。另一类常见的任务则是 IO 密集型任务,如:文件读写、网络请求、数据库查询等等。
在这里我们使用sleep来模拟一个 IO 密集型任务。
t = @task sleep(5)
Task (runnable) @0x000001df2882fa30
其中 @task sleep(1) 是匿名函数形式 Task(()->sleep(1)) 的简写。这是一个很方便的基于有参数函数构建任务的工具。
如果我们也类似地去调度这个任务,则会发现它与之前的 busywait 并不一样: 它会在调度后立即恢复 Julia 的命令行交互,而不是等待五秒钟。
schedule(t) # 立即返回(但任务还没执行完)
Task (runnable) @0x000001df2882fa30
这背后是因为sleep在操作系统内部实际是一个 IO 等待操作,它并不会真正占用 CPU 时间。
因此,调度sleep任务后,我们可以立即进行其他的计算任务。
同样地可以使用wait函数来等待 IO 任务的完成:
@time wait(schedule(@task sleep(5)))
5.009892 seconds (524 allocations: 34.266 KiB, 0.04% compilation time)
# 并发
至此,我们知道了任务的两种典型类型:CPU 密集型任务和 IO 密集型任务。 当这些任务背后调用的是不同的设备或资源时,就存在通过并发来提升效率的可能性。
function f()
@sync begin
@async sleep(1)
@async busywait(2)
@async sleep(1)
end
return nothing
end
@time f()
3.010870 seconds (16.90 k allocations: 997.930 KiB, 0.22% compilation time)
@async 与 @sync
使用@async expr 等价于运行schedule(@task expr),即先将一段代码转换为Task任务然后立即将其加入调度队列。
在大部分代码中,往往是使用@async而非schedule+@task的形式。
而@sync expr 的作用则是等待之间所有的通过@async启动的任务全部执行完毕。
在上面的代码中,尽管 f 函数中总的等待时间是 4 秒钟,但实际上只需要 3 秒钟就可以完成整个函数的执行。
这是因为第一个 sleep 的等待与第二个 busywait 的计算是同时执行的。
更进一步地,用 for 循环来构建一个版本:
function f_async()
@sync begin
for i in 1:5
@async busywait(2)
@async sleep(1)
end
end
return nothing
end
@time f_async()
11.019724 seconds (16.45 k allocations: 970.086 KiB, 0.05% compilation time)
这里的执行时间是 11 秒钟是因为:存在 5 个完整的 2 秒钟的 CPU 密集型任务,以及最终的一个 1 秒钟的 IO 密集型任务。 中间的 4 个 1 秒钟的 IO 密集型任务则是与 CPU 密集型任务同时执行的。
在这个例子中,尽管我们没有引入更多的计算资源(线程或进程),但是通过更好地构造与编排任务,我们降低了不必要的等待时间,从而提升了总体的执行效率。 这恰恰符合我们关于异步并发编程流程的最初的理解:
作为对比,一个顺序式串行的版本如下(即上图中的上半部分):
function f_sync()
for i in 1:5
busywait(2)
sleep(1)
end
return nothing
end
@time f_sync()
15.046421 seconds (25 allocations: 720 bytes)
到现在,我们已经掌握了如何通过拆分 CPU 密集型任务以及 IO 密集型任务,并通过异步调度来提升程序的执行效率。 但需要注意的是,由于整个环节中并没有引入新的计算资源,因此:对于纯 CPU 密集型的任务来说,仅靠异步本身并不能提升执行效率。
# 任务间通信
至此为止介绍的所有任务,我们都没有使用到它的返回值。但是在实际的应用中,也存在非常多的任务需要返回结果,或者需要在任务之间传递数据。
获取任务的返回值可以通过fetch获得:
t = @async 1 + 1
fetch(t)
2
这件事情本身并不是异步的,因为fetch会阻塞当前的任务(类似于wait),直到t任务执行完毕并返回结果。
相比于 fetch,在异步编程中更普遍的获取结果以及同步数据的策略是使用Channel通道。
Channel通道对象是可以理解为一种先进先出的管道队列,可同时被多个任务并发、安全地读写。这意味这它可以异步地将数据从一个任务发送到另一个任务。
下图是任务之间通过Channel传输数据的示意图:
我们介绍一个生产者和消费者问题的简单示例,生产者和消费者问题中,生产者需要将生产的数据传给消费者用于消耗,这便涉及到协同工作。
Channel通过指定它可以包含的对象的类型和可以持有的最大元素的数量来创建。如果没有指定任何类型,则会使用Any创建。
chnl = Channel(1)# 创建生产者和消费者间数据通信用通道
Channel{Any}(1) (empty)
我们在程序中创建一个大小为 1 的Channel通道,由于通道大小为 1,放入一个数据后若未有任务将值取出,尝试再向通道内放入数据的操作会被堵塞。
然后我们将不同的生产者和消费者用任务创建。
每个生产者会生产让消费者休眠的时间,将生产数据和生产者 ID 放入Channel通道:
function produce(chnl::Channel)
for i in 1:4
@async begin
sleep_time=rand(0:3)# 随机生成0-3s的休眠时间
sleep(rand()+0.5)# 模拟生产者生产时间
put!(chnl,(i,sleep_time))# 放入通道(i,sleep_time)
println("Producer $i ping: $i sleep_time: $sleep_time")
end
end
end
消费者会从Channel通道中取出休眠时间,执行后输出消费者 ID 和收到的数据结果:
function consume(chnl::Channel)
for i in 1:4
@async begin
t = take!(chnl)# 取休眠时间
sleep(t[2])# 执行休眠任务
# 输出消费者ID和收到的数据信息
println("Consumer $i pong: $(t[1]) sleep_time: $(t[2])")
end
end
end
程序的执行结果:
produce(chnl)
consume(chnl)
Producer 2 ping: 2 sleep_time: 3
Producer 4 ping: 4 sleep_time: 3
Producer 1 ping: 1 sleep_time: 2
Producer 3 ping: 3 sleep_time: 1
Consumer 4 pong: 3 sleep_time: 1
Consumer 3 pong: 1 sleep_time: 2
Consumer 1 pong: 2 sleep_time: 3
Consumer 2 pong: 4 sleep_time: 3
可以从程序结果中看到,不同的任务之间通过Channel通道进行数据通信和交互。
我们通过任务的生命周期和通信机制,可以将原本的串行计算使用异步任务的方式执行与协同工作。
# 小结
现在我们总结一下 Julia 中的Task生命周期。在 Julia 中任务的生命周期和支持的 API 如下:
@task或Task会将独立的计算操作封装成可运行的任务对象,可运行的任务对象可用schedule加入调度队列中。
为了便于使用,Julia 提供了另外一个于schedule函数相当的宏@async,可在创建Task对象的同时将其加入调度准备执行。
可使用istaskstarted和istaskdone查询任务状态,调度中或执行完毕的任务可以用fetch获取计算返回值。