协程draft

    5587
    最后修改于

    协程是一个很宽泛的话题。简单的说,是操作系统无法感知到的用户态线程。

    本想找着 wiki 一顿摘抄,但是想了想还是应该从为什么出发。

    What We Have?#

    在没有协程之前(协程没有得到关注前),要实现并发,通常是基于线程的。
    起初在互联网还没有覆盖如此大的返回,并发数量并不是一个很大的问题。以后台服务器为例。
    当一个请求进入时,可以新开一个线程。这是教科书上常常介绍的模式。
    但是随着并发增大,线程数增多,发现原来线程也并不便宜。
    每个线程都要申请内存(典型的大小如 256KB)。
    而且存在大量线程时,线程的上下文切换也会成为一笔昂贵的开销。

    What We Want?#

    我们需要一种更廉价的并发处理机制。能够轻松的处理 C10K 这样的问题。

    How To Solve?#

    通过线程池和事件循环机制,并发的问题在一定程度上缓解了这种问题。
    可以有效的缓解线程创建时的开销,以及线程在进行 I/O 操作时的阻塞问题。

    但是上下文的频繁切换的开销却是没有得到很好的解决。

    我们需要一种这样的机制。

    1. 不直接依赖操作系统的线程,但是具有类似于线程的随时暂停,随时原地恢复的能力。
    2. 创建和销毁的代价要比线程小。
    3. 切换的开销要非常小。

    于是协程重新受到大量关注。

    实现思路#

    暂停 / 重入#

    首先要考虑的问题是就是如何暂停,如何恢复

    随时暂停随时恢复的能力看起来非常厉害,理所当然先看看 CPU 是如何切换线程的。这包括用户 / 内核态切换,页表、硬件寄存器的处理等等。

    但是对于用户态的协程,不需要考虑这些,因为是纯用户态的,与硬件无关,与操作系统无关,只需要一个状态记住当前协程执行的位置,再次进入时在这个位置继续执行就好了。

    接下来要考虑的问题就是

    1. 状态存储在哪里
    2. 如何重新进入
    状态存储在哪里#

    这分为两个部分。第一个是协程内部的状态,如局部变量。第二是与协程自身恢复 / 暂停有关的状态。

    对于协程的内部状态,有两种处理思路:

    1. 既然协程有调用者,那么状态可以存储在调用者的堆栈中,也就是无栈协程
    2. 单独为协程开辟一段内存,将协程的状态存储在堆栈中,也就是有栈协程

    对于第一种思路,将状态通过闭包这样的机制保存在调用者中,这种情况下就没有额外开销。
    对于第二种思路,开辟的内存大小也不会太大。
    在这两种情况下,创建一个协程的开销都不会太大

    对于无栈协程来说,因为其状态存储在调用者的堆栈中,会导致 stackTrace 非常不清晰。

    对于协程自身的状态,则可以这样看待,协程是被调度者,就如同进程具有 PCB(Process Control Block)一样。协程也会对应一个数据结构,就像进程调度中用 PCB 来表示进程类似,在 Golang 中就是一个名为 Goroutine 的结构体,在 Python 中就是 Task。交由调度器进行调度。

    如何暂停 / 重入#

    一种朴素的思路是将代码分块,切分为几个部分,在编译时就清楚会切分成哪些部分,在对应的位置插入对应的出让控制权指令。

    为每个部分打上标签。

    当协程开始执行,到达一个特定点的时候,通过类似 yield 的方式主动让出控制权。

    当协程再次启动时,根据标签决定从哪个代码段继续执行。

    这通常是基于一种假设,协程经常直接或间接调用一些指令,这些指令是值得出让控制权的(比如阻塞 I/O)。

    通常使用状态机来实现,也就是 generator/continuation 模式。比如在 kotlin,python 中都是基于这种方式实现的。

    loading...

    尽管 golang 不是基于状态机的协程实现,但是 golang 对系统调用封装中也包含了类似的主动出让控制权的逻辑,只是出让 / 重入时处理的机制不同。(主动出让,有关于出让时机)

    另外一种思路则是基于 PC 指针等一细节状态寄存器的暂停 / 重入,程序可以在任意位置停止。因此除了上面所说的在代码中主动出让控制权之外,还可以被动通过信号的方式被剥夺控制权。类似于操作系统的调度模式,利用 POSIX like 的信号机制,通过这种方式强制暂停协程。(在 golang 中,1.14+ 版本支持通过 SIGURG 信号强制暂停协程执行)

    当外部发出信号,CPU 执行完一条指令之后,监测到信号,就跳出到对应的信号处理程序。在这个处理程序中保存协程的状态,保存程序当前的 PC 指针。

    当再次启动协程时,根据对应的状态值从特定的位置开始执行。

    在上述的两种暂停 / 重入思路中,也隐含着描述了切换的开销。
    无论哪一种,相比于系统级线程的切换开销,都少了内核态切换、硬件上下文处理等繁杂事情。
    因此协程的切换开销也是十分小的。

    调度机制#

    但是这就是协程吗?似乎有点太简单了?毕竟上面只是讲了协程的实现思路,所以不妨再延伸(还原)一些。

    从上面的介绍来说,我们将协程看做一个可以暂停(挂起)/ 恢复的函数。

    但是协程本身的挂起和恢复没有太大的意义。我们在意的是协程的挂起和恢复能够节省出 CPU 时间。这需要从 IO 中来。因此一个协程在等待 IO 时,我们就可以让它暂停,让别的协程上。我可以启动大量的协程,轮流使用 CPU,就可以尽可能的压榨出 CPU 的时间。所以需要一个机制来管理这样的协程,在合理的时机将协程挂起和恢复。也就是协程调度机制。

    在 Golang 中是基于 GMP 模型的 go Scheduler。

    在 NodeJS 中则是事件循环,事件循环 + 异步回调,看起来与协程似乎不沾边。但是如果跟 kotlin 的 coroutine 进行对比的话,会发现两者实际上是类似的,async/await 这样的语法糖转为 Promise,然后作为回调进入事件循环被调度。

    async/await 底层的 generator 本身就是一种协程(可以暂停 / 重入)。加上 NodeJS 的 eventloop 调度,我认为是很典型的 stackless 协程实现了。
    少数见到相关讨论的比如 Coroutine Event Loops in Javascript (x.st)
    Edit: lorenzofox blog | Coroutines in Javascript

    Golang GMP#

    在 Golang 中,以 G(Goroutine) 代表协程,以 M(Machine) 代表内核线程,负责实际执行 G,以 P (Processor) 代表协程处理者它维护一个处于 Runnable 状态的 G 队列。每个 M 绑定到一个 P (一个 P 可以在多个 M 间切换)从 P 中获取 G 进行执行。

    loading...

    一般来说,保持 G/M 一对多的关系即可,但是 Go 1.1 引入了新的 P 概念。因为可以减少 G 全局队列锁的影响,具体可以参见 Scalable Go Scheduler Design Doc | Google Doc

    GMP 调度流程大致如下:

    1. M 为了运行 G,需得先获取 P。
    2. M 从 P 的本地队列(Local Run Queue)获取 G。
    3. 若 LRQ 中没有可运行的 G,M 会尝试从全局队列 (GRQ) 获取 G 放入 LRQ,
    4. 若 GRQ 中没有 G,M 会从其他 P 的 LRQ 窃取一半的 G 放入自己绑定的 P 的 LRQ。
    5. 获取到可执行的 G 后,M 运行 G,执行完毕重复 2. 操作
      在这种调度机制中,协程之间平等,调度机制不会特意去区分协程之间的父子关系,因此被称为对称协程
    JS Event Loop(NodeJS)#

    NodeJS 中的事件循环分为几个阶段

    	┌───────────────────────────┐
    ┌──>│           timers          │
    │	└─────────────┬─────────────┘
    │   ┌─────────────┴─────────────┐
    │   │     pending callbacks     │
    │   └─────────────┬─────────────┘
    │   ┌─────────────┴─────────────┐
    │   │      idle, prepare        │
    │   └─────────────┬─────────────┘       ┌───────────────┐
    │   ┌─────────────┴─────────────┐       │   incoming:   │
    │   │            poll           │ <─────┤ connections,  │
    │   └─────────────┬─────────────┘       │ data, etc.    │
    │   ┌─────────────┴─────────────┐       └───────────────┘
    │   │           check           │
    │   └─────────────┬─────────────┘
    │   ┌─────────────┴─────────────┐
    └───┤        close callbacks    │ 
        └───────────────────────────┘
    

    首先,JS 执行主函数,执行完毕,调用栈为空后,就开始进入事件循环的处理阶段,首先处理每个阶段的的事情,然后逐个执行任务队列中的任务,直到队列为空或达到最大数量限制。在事件循环进入下一阶段之前,会立即处理微任务队列。在每个循环之间,检查是否存在等待中的异步 IO(如 socket 监听)、计时器,如果没有就退出循环。

    • timers: 该阶段执行由 setTimeoutsetInterval 注册的(已超时)任务。
    • pending callback:执行一些被挂起的回调(如在 poll 阶段处理 IO 回调时出现了错误,触发 error 回调,比如socket.on('error', handler)
    • idle, prepare:nodejs 内部使用(libuv),对外部不可见
    • poll:轮询阶段,分为两个部分。一是计算超时时间,二是依据超时等待。
      • 首先计算轮训的超时时间,超时时间可能为 0(比如存在 setImmediate 回调)、也可能是最短的计时器超市时间(如注册 100,200ms 的计时器,超时时间可能为 100ms)。
      • 已超时时间为准,等待新的 IO 事件,如果新的 IO 事件到达,立即处理,直到对应的队列为空。
      • 当队列为空时,检查计时器,将超时的回调添加到 timers 阶段的队列中,如果存在超时的 timer,就进行下一阶段,如果check 队列不为空,就进行check阶段、否则可以直接
    • check:该阶段执行由 setImmediate 注册的任务。
    • close callback:一些关闭回调,如 socket.on('close', handler)

    该图省略了一些细节如(pending callback),但是可以很好的描述微任务队列与其他队列的关系。

    那么换个视角,对于下面这样一段异步代码。

    js
    async function main() {
    	console.log('part1')
    	const foo = await fetch('foo')
    	console.log('part2')
    	const bar = await fetch('bar')
    	console.log('part3')
    }
    
    main()
    

    当执行到const foo = await fetch('foo')时,会进行 IO 操作,IO 完成时,

    js
    console.log('part2')
    const bar = await fetch('bar')
    console.log('part3')
    

    这部分被当作回调,放入 MicroTaskQueue。

    如果我们换一种视角 (如同 kotlin 的 suspend)

    js
    async function main() {
    	console.log('part1')
    	const foo = await fetch('foo') // suspend here
    	console.log('part2')
    	const bar = await fetch('bar')
    	console.log('part3')
    }
    

    把放入 MicroTaskQueue 的回调当作是被挂起的 main()函数,随后执行回调时就是重入main函数并在对应位置继续。那么这就很符合无栈协程的定义了。

    缺陷#

    似乎协程非常的优秀,但是凡事都有两面性。

    coroutine 同样带有并发编程的通病,其 callstack 往往十分混乱,一方面无法做到很好的 stackTrace,难以调试,另外一方面也难以做好错误处理。

    比如在下面这段 kotlin 代码中,当协程被实际挂起前(调用delay(1000L)),调用栈大概是这样的[main, A, B, C]

    但是当 delay 执行结束后,调用栈就变成了[main, C],随着C,B,A调用完成,调用栈就依次为 [main, B][main, A][main]

    kotlin
    fun main() {
        try {
            runBlocking { A() }  
        }catch (e:Exception){  
            e.printStackTrace()  
        }
    }
    suspend fun A() { B() }  
    
    suspend fun B() { C() }
    
    suspend fun C() {
        // throw Exception("Before delay")
        delay(1000L)
        // throw Exception("After delay")  
    }
    

    疑惑

    令我疑惑的是,在 python 中,无论是挂起前还是挂起后,调用栈似乎可以正常保存。
    task1sleep -> await 的一行yield self中,直接就将控制权流转给了另外的一个协程task2

    python
    async def main():  
        task1 = asyncio.create_task(a("task1"))  
        task2 = asyncio.create_task(a("task2"))  
        await asyncio.gather(task1, task2)  
      
      
    async def a(t):  
        await b(t)
      
      
    async def b(t):  
        await c(t)  
      
      
    async def c(t):  
        print(f"Before delay {t}")
        traceback.print_stack()
        await asyncio.sleep(1)
        print(f"After delay {t}")
        traceback.print_stack()
      
      
    if __name__ == "__main__":  
        asyncio.run(main())
    
    仍然持有的疑问#

    什么叫 "有栈协程在任意函数调用层级的任意位置进行挂起"?

    这句话最开始是在各种介绍协程的 “教程” 或 “博客” 里面看到的。后续在查看各种资料时,也能看到。比如 Revisiting coroutines

    This description of coroutines corresponds to the common perception of the concept, but leaves open relevant issues with respect to a coroutine construct. Apart from the capability of keeping state, we can identify three main issues that distinguish different kinds of coroutine facilities:

    1. the control-transfer mechanism, which can provide symmetric or asymmetric coroutines;
    2. whether coroutines are provided in the language as first-class objects, which can be freely manipulated by the programmer, or as constrained constructs;
    3. whether a coroutine is implemented as a stackful construct, that is, whether it is able to suspend its execution from within nested calls.
    1. 协程是否以堆栈结构实现,即它是否能够在嵌套调用中暂停执行

    这句话如果单看有栈协程很好理解,即可以在任意点被暂停,但是对比来看,这也在是在说无栈协程不可以任意函数调用层级任意位置进行挂起。

    这句话有两个部分,一是任意层级,一是任意位置。

    不可以在任意位置挂起很好理解,因为无栈协程底层是通过状态机实现的,所以出让控制权的位置是固定的(yield),而有栈协程除了主动埋点出让控制权外,还可以像 golang 那样用 runtime 进行抢占,位置不固定。

    但是说无栈协程不可以在任意层级挂起则有点难理解。

    Introduction - 1.85.0 (boost.org) 中也提到:

    With a stackless coroutine, only the top-level routine may be suspended. Any routine called by that top-level routine may not itself suspend.

    无栈协程只能在顶级例程 (routine) 被挂起,该顶级例程调用的任何例程本身都不能暂停。

    c++ - What does it mean for "With a stackless coroutine, only the top-level routine may be suspended." - Stack Overflow

    在这个回答中,提到

    In each invocation of co_await, only the top level coroutine is suspended. To suspend a lower level, that level must suspend itself explicitly. And at that point, it is now the current "top level".

    按照这种方式来理解,似乎是因为 a,b,c 每一层都显式含有 await,每一层都需要在自己身为调用栈顶的时候,显式 await,才算进行挂起。

    kotlin 的文档中,提到 kotlin 虽然是无栈协程,但是可以在任意层级进行挂起。从上面的分析来看,也是可以接受的。

    而在上面 python 的示例中,task1 显然也是实际在 c 处进行挂起的。
    甚至相比于 kotlin,在挂起并恢复后,仍然保留协程调用栈的信息,而 kotlin 却丢失了这样的信息。

    但是与 python 相比,kotlin 只是不用显式加入 await,我理解suspend只是 python 这种的async/await的一个语法糖,只要可能,就都 await。底层都是 continuation/generator 的状态机。

    那么说 kotlin 可以在任意层级进行挂起是否有失偏颇。

    而且在 revisiting coroutine 后面,明确指出,python 中,只有当其控制栈处于与创建时相同的层级时,它才能暂停执行。

    Despite constituting a firstclass object, a Python generator is not a stackful construct; it can only suspend its execution when its control stack is at the same level that it was at creation time. In other words, only the main body of a generator can suspend.

    一些参考资料#

    Revisiting Coroutines .pdf

    There's nothing about stackless coroutines that means you can't have good stack ... | Hacker News (ycombinator.com)

    CppCon 2018: G. Nishanov “Nano-coroutines to the Rescue! (Using Coroutines TS, of Course)” (youtube.com)

    Beautiful ideas in programming: generators and continuations (hhyu.org)

    白话并发模型和异步编程范式・Joey's Tech Notes & Blogs (code2life.top)

    破解 Kotlin 协程(13):协程的几类常见的实现 | Benny Huo

    看了很多关于协程 “回答 / 教程 / 博客”,都提到有栈协程可以在任意函数调用层级的任意位置进行挂起,有栈协程可以这样很好理解。

    Revisiting coroutines 是这样区分有栈无栈的

    whether a coroutine is implemented as a stackful construct, that is, whether it is able to suspend its execution from within nested calls.

    这也就是说无栈协程不支持在任意函数调用层级进行挂起。

    Introduction - 1.85.0 (boost.org) 中也提到:

    With a stackless coroutine, only the top-level routine may be suspended. Any routine called by that top-level routine may not itself suspend.

    无栈协程只能在顶级例程 (routine) 被挂起,该顶级例程调用的任何例程本身都不能暂停。

    为什么说无栈协程不可以在任意层级挂起?

    结合代码来看

    kotlin
    fun main() {
        try {
            runBlocking { A() }  
        }catch (e:Exception){  
            e.printStackTrace()  
        }
    }
    suspend fun A() { B() }  
    
    suspend fun B() { C() }
    
    suspend fun C() {
        // throw Exception("Before delay")
        delay(1000L)
        // throw Exception("After delay")  
    }
    

    当协程被实际挂起前(调用delay(1000L)),调用栈大概是这样的[main, A, B, C](省略了不那么重要的部分)。

    但是当 delay 执行结束后,调用栈就变成了[main, C],随着C,B,A调用完成,调用栈就依次为 [main, B][main, A][main]

    kotlin 的文档中,提到 kotlin 虽然是无栈协程,但是可以在任意层级进行挂起。从这个示例来看,是可以接受的。

    下面是一段 python 的代码

    python
    async def main():  
        task1 = asyncio.create_task(a("task1"))  
        task2 = asyncio.create_task(a("task2"))  
        await asyncio.gather(task1, task2)  
      
      
    async def a(t):  
        await b(t)
      
      
    async def b(t):  
        await c(t)  
      
      
    async def c(t):  
        print(f"Before delay {t}")
        traceback.print_stack()
        await asyncio.sleep(1)
        print(f"After delay {t}")
        traceback.print_stack()
      
      
    if __name__ == "__main__":  
        asyncio.run(main())
    

    在上面 python 的示例中,task1 实际也是在 c 处的 sleep 进行挂起的。甚至相比于 kotlin,在挂起并恢复后,仍然保留协程调用栈的信息 ([main,a,b,c]),而 kotlin 却丢失了这样的信息。

    这两者都与上面描述的无栈协程不支持在任意函数调用层级进行挂起不符合。

    所以这是第一个问题:什么是在任意函数调用层级进行挂起

    针对这个问题也进行了一些搜索,只有少数几个回答提到了这个:什么是在任意函数调用层级进行挂起?

    c++ - What does it mean for "With a stackless coroutine, only the top-level routine may be suspended." - Stack Overflow

    在这个回答中,提到

    In each invocation of co_await, only the top level coroutine is suspended. To suspend a lower level, that level must suspend itself explicitly. And at that point, it is now the current "top level".

    按照这种方式来粗略理解,似乎是因为 a,b,c 每一层都显式含有 await,每一层都需要在自己身为调用栈顶的时候,显式 await,才算进行挂起。

    只能在连续的 async func 调用中挂起。

    但是与 python 相比,我理解suspend只是 python 这种的async/await的一个语法糖,kotlin 只是不用显式加入 await,只要可能,就都 await。其底层都是 continuation/generator 的状态机。

    如果是这样理解在任意函数调用层级进行挂起,那么说 kotlin 所说可以在任意层级进行挂起是否有失偏颇?

    回答:

    无栈协程当中之所以要有 async func 这个东西正是因为不能在任意调用中挂起,如果可以的话,那只要统一成 func 就行了。有栈协程中其实是不区分两者的,调用一个普通函数,普通函数内部也可以挂起,但从 async func 当中调用一个 func,func 内部是不可以再调用一个 async func 然后从 async func 里面挂起的,这就是两者机制的不同。编译器会将 async func 编译成使用独立的协程栈的调用方式,看似和普通函数调用相似,其实内部机制完全不同,所以不能混用,只能在连续的 async func 调用中挂起。

    回到这句:

    协程是否以堆栈结构实现,即它是否能够在嵌套调用中暂停执行

    对于无栈协程,编译器会将 async func 编译成使用独立的协程栈的调用方式。对于普通函数则不会。

    如果将async func 和 普通函数视为A,B两种类型。

    对于三个函数async XYasync Z

    假设存在 [X,Y,Z] 的调用链,在理论上,其作为一个协程,可以在 Z 处进行挂起。

    但编译器没办法处理这种情况。

    编译器不允许嵌套形如「A,B,A」这样两种不同的函数。

    从 async func 当中调用一个 func,func 内部是不可以再调用一个 async func 然后从 async func 里面挂起的

    对于一个无栈协程,其调用栈只能是BBB...AAA...BBB 这种类型。挂起时,调用栈只能是BBB...AAA 这样的。

    这就是所谓的不能在任意(嵌套)层级挂起。这也是为什么 async具有传染性(当协程控制与底层实际挂起的操作之间存在多个层级时,会将这部分传染为 async func)。

    从这个角度理解,kotlin 的 suspend 也是具有同样的问题,那么所谓 kotlin 支持在任意层级挂起是有失偏颇的。

    • 🥳0
    • 👍0
    • 💩0
    • 🤩0