# 异步生产者消费者示例


Channel通道实现生产者消费者问题

阅读提示

当你在阅读具体示例之前,建议先阅读(异步快速入门)来了解相关的内容背景。

# 多生产者多消费者示例

假设有一个餐厅,有两位主厨(chef)和两位服务员(server)。当顾客点餐后,两位主厨开始做菜;做好一道菜后,一位服务员负责上菜。所有菜做完后,餐厅结束营业。现在,不考虑顾客的参与,编写一个程序来模拟这个餐厅的主厨和服务员的工作情况。

# 系统简要分析

从描述中可以看出,这是一种典型的生产者-消费者模型的应用。在这种模型中,生产者负责产生数据,消费者来消费数据;二者的运作是并发的,要竞争数据的访问权。因此需要对二者进行互斥和共享操作。

从处理的对象来看,主厨面对的对象是订单和做好的菜,服务员面对的是做好的菜。这两项都要面对数据的竞争。这里使用 Julia 中的 Channel通道来处理并发任务。

定义厨师,服务员,菜肴三个结构体:

struct CHEF
    name::String
end

struct SERVER
    name::String
end

struct COURSE
    table_id::Int
    course::String
end

流程如下:

据此定义对应的cookserve方法:

function cook(chef::CHEF, order_chnl, cook_chnl::Channel, cv, job_done_chnl)
    while isready(order_chnl) # 订单不空就继续
        course = take!(order_chnl) # 取订单
        println("chef ", chef.name, " is cooking ", course, " for table ", course.table_id)
        sleep(1) # 烹饪耗时
        println("chef ", chef.name, " cooked ", course, " for table ", course.table_id)
        put!(cook_chnl, course) # 送给上菜端
    end
    println(chef.name, " ok")
    notify(cv) # 订单空,退出工作
    return put!(job_done_chnl, 0) # 通知服务员菜单已结束
end
function serve(server::SERVER, cook_chnl::Channel, cv, job_done_chnl)
    while (~isready(job_done_chnl)) || (isready(cook_chnl))
        course = take!(cook_chnl)
        println(
            "server ",
            server.name,
            " is serving ",
            course.course,
            " for table ",
            course.table_id,
        )
    end
    println(server.name, " ok")
    return notify(cv) # 订单空且上菜完毕,退出工作
end

对于运行函数,先定义一个简单的订单流,然后定义厨师和服务员,以及对应的响应变量,然后开始运行,wait响应变量直到全部返回,流程结束:

function main()
    # 简化点单流程
    orders = [
        [1, "腌笃鲜"],
        [4, "腌笃鲜"],
        [1, "小炒黄牛肉"],
        [1, "红烧鱼"],
        [1, "鲜蔬汤"],
        [2, "鲜蔬汤"],
        [2, "羊肉红柳串"],
        [3, "西冷牛排"],
        [4, "菲力牛排"],
        [3, "白斩鸡"],
        [3, "红烧意面"],
    ]
    order_ch = Channel{COURSE}(100) # 订单放入channel中
    for i in axes(orders, 1)
        put!(order_ch, COURSE(orders[i][1], orders[i][2]))
    end

    cook_chnl = Channel{COURSE}(2) ## 定义上菜Channel

    james = CHEF("阿杰") # 定义厨师
    linda = CHEF("小娜") # 定义厨师

    zoe = SERVER("阿和") # 定义服务员
    david = SERVER("小叶") # 定义服务员

    Chef_cv = [Condition() for i in 1:2] # 定义厨师们的结束响应变量
    server_cv = [Condition() for i in 1:2] # 定义服务员们的结束响应变量

    job_done_ch = Channel{Int}(2) # 一个厨师告知是否结束的管道

    # 开始工作
    @async cook(james, order_ch, cook_chnl, Chef_cv[1], job_done_ch)
    @async cook(linda, order_ch, cook_chnl, Chef_cv[2], job_done_ch)

    @async serve(zoe, cook_chnl, server_cv[1], job_done_ch)
    @async serve(david, cook_chnl, server_cv[2], job_done_ch)

    # 等待退出
    @sync begin
        for i in Chef_cv
            @async begin
                wait(i)
            end
        end
        for i in server_cv
            @async begin
                wait(i)
            end
        end
    end
    # 结束
    return println("所有工作完成。 餐厅关闭")
end

运行结果如下:

main()
chef 阿杰 is cooking COURSE(1, "腌笃鲜") for table 1
chef 小娜 is cooking COURSE(4, "腌笃鲜") for table 4
chef 阿杰 cooked COURSE(1, "腌笃鲜") for table 1
chef 阿杰 is cooking COURSE(1, "小炒黄牛肉") for table 1
chef 小娜 cooked COURSE(4, "腌笃鲜") for table 4
chef 小娜 is cooking COURSE(1, "红烧鱼") for table 1
server 阿和 is serving 腌笃鲜 for table 1
server 小叶 is serving 腌笃鲜 for table 4
chef 阿杰 cooked COURSE(1, "小炒黄牛肉") for table 1
chef 阿杰 is cooking COURSE(1, "鲜蔬汤") for table 1
chef 小娜 cooked COURSE(1, "红烧鱼") for table 1
chef 小娜 is cooking COURSE(2, "鲜蔬汤") for table 2
server 阿和 is serving 小炒黄牛肉 for table 1
server 小叶 is serving 红烧鱼 for table 1
chef 阿杰 cooked COURSE(1, "鲜蔬汤") for table 1
chef 阿杰 is cooking COURSE(2, "羊肉红柳串") for table 2
chef 小娜 cooked COURSE(2, "鲜蔬汤") for table 2
chef 小娜 is cooking COURSE(3, "西冷牛排") for table 3
server 阿和 is serving 鲜蔬汤 for table 1
server 小叶 is serving 鲜蔬汤 for table 2
chef 阿杰 cooked COURSE(2, "羊肉红柳串") for table 2
chef 阿杰 is cooking COURSE(4, "菲力牛排") for table 4
chef 小娜 cooked COURSE(3, "西冷牛排") for table 3
chef 小娜 is cooking COURSE(3, "白斩鸡") for table 3
server 阿和 is serving 羊肉红柳串 for table 2
server 小叶 is serving 西冷牛排 for table 3
chef 阿杰 cooked COURSE(4, "菲力牛排") for table 4
chef 阿杰 is cooking COURSE(3, "红烧意面") for table 3
chef 小娜 cooked COURSE(3, "白斩鸡") for table 3
小娜 ok
server 阿和 is serving 菲力牛排 for table 4
阿和 ok
server 小叶 is serving 白斩鸡 for table 3
小叶 ok
chef 阿杰 cooked COURSE(3, "红烧意面") for table 3
阿杰 ok
所有工作完成。 餐厅关闭

可以看到,多个厨师和服务员并发有序地工作着。

# 另请参阅