协程的学习

自从在知乎上听到”圆胖肿”提起过Project Loom之后,我就对协程有些感兴趣了,一开始是搜的廖雪峰博客,里面用的python做示例,好嘛,yeild这个关键词我一看就头大,于是就撂下了。
后来谷歌宣布kotlin成为安卓开发语言之后,知乎上突然有了一大波人宣传kotlin,我也试了试,mmp,看的好难受啊,什么类型后置;val、var;map、filter省略定义、fun代表方法,我当时心想,这什么玩意啊,就很抵触
去年底对Go感兴趣,多返回值牛逼啊,java的pair、triple太有毛病了,用了半天,还买了个mook的课程,学完了想这啥啊,我用java写这课程,30分钟就完事了好不好,后来用用什么异常机制、包管理机制太反人类,慢慢也写的少了。

但是在这些过程里,协程始终是一个被吹嘘的点,他们都说好用,我也不知道哪好用,他们都说轻量,那大概是真的轻量级把,OK,捡起廖神博客继续来看,没想到这次一下就理解了,yield就是个中断点,返回一个value,下次next调用时候传个新的value,yeild执行时的value就被替换成新value继续了。
OK 我看懂yield和next了,so?这个协程是不是有点不太对,这种机制是挺有意思,但是跟网评上说的”轻量级线程”还是差别挺大的把!

Go的协程其实看起来就像是java里面跑线程:go fun1()就起来了,但是这个简单到我没法理解了,chan又好像就是一个BlockingQueue,但是Go的文章都好像没有对他俩进行对比,看起来可能是用Go的都不太喜欢java把。
现在在学kotlin,正好跑了一遍kotlin的协程手册,我只能说kotlin的手册真的详细太多了,而且还可以看一看设计者的思维。

##Kotlin中的协程

Kotlin中的协程是个实验性质的东西,maven包在这,配合kotlinVersion1.3.50使用:

<dependency>
    <groupId>org.jetbrains.kotlinx</groupId>
    <artifactId>kotlinx-coroutines-core</artifactId>
    <version>1.3.2</version>
</dependency>

什么是Kotlin的协程就不说了,我学Go以来觉得最有用的就是DuckType的理念,你不用管定义,做出来是什么东西的样子和功能,就是什么东西,非要生搬硬套进一个概念名字,可能辞海都装不下这些名词。

主要来看下面几个方向:

1,与”轻量级线程”相似的方向;在这个方向中,我们先运用kotlin协程,再比较它与线程在应用中的不同之处:

  • 协程中支持finally操作,确保即使协程(named thread)被cancel了之后,finally部分也一定会被调用
  • 仅有协程代码块内支持suspend函数的调用
  • 协程中可以使用async包裹异步协程代码块并约定执行时间(立即还是LAZY),返回Deferred,这个Deferred也可以被取消,也可以await,这与CompleteFuture的API类似。
  • 线程由操作系统控制,暴露给JVM;但协程由JVM控制,有一个神奇的例子:同一个协程,可以使用withContext获取到不同的线程上下文。
  • 协程会等待它内部的子协程结束。

2,数据共享的方向。

  • flow:kotlin官方示例中演示了flow,与Reactor模型相似,flow具有emit和collect,以及其他的比如buffer等等,我将它看成是一个语法糖版本的Flow机制实现。
  • 通道:协程中的通道,是具有一种 FIFO 特性的;它与BlocingQueue不太一样的:在一个协程内部,通道send出去后,是不能接受回来的,只有另一个协程才可以收到这个协程发出的信息(这是实验出来的性质,也不太清楚是为什么,就算FIFO也应该能拿回来才对)

##kotlin在线程安全上的语法糖:

线程安全,其实就是原子性,可见性,有序性,官方文档在这里专门提到了,Kotlin中所有的对象都是包装对象,所以volatile是只能保证可见和有序,不能保证原子的。

  • 数据结构这块没有变化,仍然是Java Atom类型
  • 单线程上下文锁定:
 suspend fun massiveRun(action: suspend () -> Unit) {
     val n = 100  // 启动的协程数量
     val k = 1000 // 每个协程重复执行同一动作的次数
     val time = measureTimeMillis {
         coroutineScope { // 协程的作用域
             repeat(n) {
                 launch {
                     repeat(k) { action() }
                 }
             }
         }
     }
     println("Completed ${n * k} actions in $time ms")    
 }

 val counterContext = newSingleThreadContext("CounterContext")
 var counter = 0

 fun main() = runBlocking {
     withContext(Dispatchers.Default) {
         massiveRun {
             // 将每次自增限制在单线程上下文中
             withContext(counterContext) {
                 //这部分单线程执行
                 counter++
             }
         }
     }
     println("Counter = $counter")
 }

fun main() = runBlocking {
    // 将一切都限制在单线程上下文中
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

,java中原始类型的++ --是原子的,包装类型不是
  • 互斥锁Mutex:Synchronize
val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // 用锁保护每次自增
            //withLock就是mutex.lock(); try { …… } finally { mutex.unlock() }的语法糖
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}
  • actor:这个是个非常有意思的东西,说回来它还是个语法糖,官方文档举例时还用了密封类sealed class,搞得很容易看不懂,原代码是这样的:
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求


// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor() // 创建该 actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // 发送一条消息以用来从一个 actor 中获取计数值
    val response = CompletableDeferred<Int>()

    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // 关闭该actor

}

我修改成这样:


fun CoroutineScope.easyActor()=actor<Any>{
    //内部信息innerData
    var innerData=0
    //循环接受事件
    for(msg in channel){
        when(msg){
            //如果是Int事件就innerData加这个值
            is Int ->innerData+=msg
            //Reader对象用来包装返回值
            is Reader -> msg.response.complete(innerData)
        }
    }
}

class Reader(val response: CompletableDeferred<Int>)

fun main() = runBlocking<Unit> {
    val act = easyActor() // 创建该 actor
    withContext(Dispatchers.Default) {
        massiveRun {
            act.send(2)
        }
    }
    // 发送一个Reader包裹返回值,这里需要Reader来包裹,因为范型擦除以后,when那边没法过检查。
    val r=Reader(CompletableDeferred<Int>())
    act.send(r)
    println("Counter = ${r.response.await()}")
    act.close() // 关闭该actor
}

这个协程,学了两天多,感受了一些kotlin的新概念,我现在越发的觉得,新名词学不完,只有学好基本的东西,才能了解各个名词之间的区别,你就看这kotlin的协程文档,哪里有满百度找出来的yield的影子,只有最后一个select-expression章节才提到了这种,然而这个章节是实验性的;本身coroutine就在kotlinx包里了,这个select-expression更实验性,我是有点扛不住了,下次再学。