一. Kotlin Flow 介绍
Flow — cold asynchronous stream with flow builder and comprehensive operator set (filter, map, etc);
流 - 冷的异步流,具有流生成器和全面的操作符集(过滤器、map等)。
Flow 就是 Kotlin 协程与响应式编程模型结合的产物,你会发现它与 RxJava 非常像。
Flow有以下特点:
1.冷数据流,不消费则不生产,这一点与Channel正相反:Channel的发送端并不依赖于接收端。
2.Flow通过flowOn改变数据发射的线程,数据消费线程则由协程所在线程决定
3.与RxJava类似,支持通过catch捕获异常,通过onCompletion回调完成
4.Flow没有提供取消方法,可以通过取消Flow所在协程的方式来取消
二. Flow 基本使用
runBlocking {
flow {
for (i in 1..6) {
//发射数据
emit(i)
println("emit:$i")
}
}.catch {
//异常处理
println("catch")
}.onCompletion {
//完成回调
println("onCompletion")
}.collect { num ->
// 具体的消费处理
// 只有collect时才会生产数据,原因之后会讲
println("collect:$num")
}
}
输出:
collect:1
emit:1
collect:2
emit:2
collect:3
emit:3
collect:4
emit:4
collect:5
emit:5
collect:6
emit:6
onCompletion
2.1 创建 flow
除了刚刚展示的 flow builder 可以用于创建 flow,还有其他的几种方式:
1.使用flowOf可以定义一组固定的值
fun simple() = flowOf(1,2,3)
2.可以使用 asFlow() 扩展函数将各种集合和序列转换为流。
suspend fun simple() = listOf(1,2,3).asFlow().collect { println(it) }
2.2 切换线程
相比于 RxJava 需要使用 observeOn、subscribeOn 来切换线程,flow 会更加简单。只需使用 flowOn,废话不多说,举个??。
runBlocking {
flow {
for (i in 0..3) {
println("Emit Flow in ${Thread.currentThread().name}")
emit(i)
}
}.flowOn(Dispatchers.IO).map {
println("Map Flow in ${Thread.currentThread().name}")
it * it
}.collect {
println("Collect Flow in ${Thread.currentThread().name}")
println("Result---$it")
}
}
执行结果:
Emit Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Map Flow in main
Collect Flow in main
Result---0
Map Flow in main
Collect Flow in main
Result---1
Map Flow in main
Collect Flow in main
Result---4
Map Flow in main
Collect Flow in main
Result---9
由这个例子可以看出,开发人员可以通过flowOn()改变的是Flow函数内部发射数据时的线程,而在collect收集数据时会自动切回创建Flow时的线程。
Flow的调度器 API 中看似只有flowOn与subscribeOn对应,其实collect所在协程的调度器也与observeOn指定的调度器对应。
对比类型 | Flow | RxJava |
---|---|---|
改变数据发送的线程 | flowOn | subscribeOn |
改变消费数据的线程 | 它自动切回所在协程的调度器 | observeOn |
flowOn只对它上面的代码有效。下面看下一个
runBlocking {
flow {
for (i in 0..3) {
println("Emit Flow in ${Thread.currentThread().name}")
emit(i)
}
}.map {
println("Map Flow in ${Thread.currentThread().name}")
it * it
}.flowOn(Dispatchers.IO).collect {
println("Collect Flow in ${Thread.currentThread().name}")
println("Result---$it")
}
}
执行结果:
Emit Flow in DefaultDispatcher-worker-1
Map Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Map Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Map Flow in DefaultDispatcher-worker-1
Emit Flow in DefaultDispatcher-worker-1
Map Flow in DefaultDispatcher-worker-1
Collect Flow in main
Result---0
Collect Flow in main
Result---1
Collect Flow in main
Result---4
Collect Flow in main
Result---9
这两个例子只是改变了flowOn的位置,map的工作线程就变了。map操作符移动到flowOn上面,map操作就会再flowOn指定的线程操作。
flowOn操作符影响的是上游的操作,而 collect() 指定哪个线程,则需要看整个 flow 处于哪个 CoroutineScope 下。
注意:不允许在内部使用withContext()来切换flow的线程。因为flow不是线程安全的,如果一定要这么做,请使用channelFlow。
2.3 flow异常处理
low的异常处理也比较直接,直接调用 catch 函数即可:
runBlocking {
flow {
emit(1)
throw NullPointerException()
}.catch {
println("catch->$it")
}.collect {
println("collect->$it")
}
}
执行结果:
collect->1
catch->java.lang.NullPointerException
在flow的参数中抛了一个空指针异常,在catch函数中就可以直接捕获到这个异常。如果没有调用catch函数,未捕获的异?;嵩谙咽迸壮觥:蚦ollect一样,catch 函数只能捕获它的上游的异常。
Flow中的catch对应着 RxJava 中的 onError。
当然,还可以使用try{}catch{}块来捕获异常。??如下
runBlocking {
flow {
try {
emit(1)
throw NullPointerException()
} catch (it: Exception) {
println("catch->$it")
}
}.collect {
println("collect->$it")
}
}
输出结果一样,这里就不再展示了。
2.6 flow 完成
如果我们想要在流完成时执行逻辑,可以使用 onCompletion:
runBlocking {
flow {
emit(1)
}.onCompletion {
println("onCompletion")
}.collect {
println("collect->$it")
}
}
执行结果:
collect->1
onCompletion
也可以使用try{}finally{}块在收集完成时执行一个动作。代码和try{}catch{}差不多。
2.5 flow 取消
flow需要在协程里面使用,因为collect是挂起函数,flow基于冷流的特性,不调用collect构建器的代码压根不会走。
所以 flow 在一个挂起函数内被挂起了, flow 才能被取消。
runBlocking {
val f = flow {
for (i in 1..3) {
delay(500)
println("emit $i")
emit(i)
}
}
withTimeoutOrNull(1600) {
f.collect {
delay(500)
println("consume $it")
}
}
println("cancel")
}
执行结果:
emit 1
consume 1
emit 2
cancel
三、Flow 操作符
3.1 过渡流操作符
过渡操作符可以对流进行转换,从上游获取数据做一定更改,然后返给下游。
map
我们可以在 map 中执行一些过渡操作,比如本例中将上游发送的数据*9,然后再发射给下游
runBlocking {
(1..6).asFlow().map {
it * 9
}.collect {
println(it)
}
}
执行结果:
9
18
27
36
45
54
通常我们的代码中有多个接口需要连续调用的时候就很适合用这种方法,可以十分有效的避免接口调用嵌套。
zip
合并两个flow数据流,会分别对两个流合并处理,也就是快的流要等慢的流发射完才能合并。一般用作合并两个网络请求返回数据。
runBlocking {
val nums = (1..3).asFlow().onEach { delay(100) }
val strs = flowOf("one", "two", "three","four").onEach { delay(200) }
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.zip(strs) { a, b -> "$a -> $b" } // 使用“zip”组合单个字符串
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
执行结果:
1 -> one at 274 ms from start
2 -> two at 491 ms from start
3 -> three at 692 ms from start
combine
使用 combine 合并时,每次从 flowA 发出新的 item ,会将其与 flowB 的最新的 item 合并。
runBlocking {
val nums = (1..3).asFlow().onEach { delay(100) }
val strs = flowOf("one", "two", "three","four").onEach { delay(200) }
val startTime = System.currentTimeMillis() // 记录开始的时间
nums.combine(strs) { a, b -> "$a -> $b" }
.collect { value ->
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
执行结果:
1 -> one at 297 ms from start
2 -> one at 312 ms from start
3 -> one at 422 ms from start
3 -> two at 500 ms from start
3 -> three at 703 ms from start
3 -> four at 906 ms from start
filter
filter是过滤操作,看代码
runBlocking {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
执行结果:
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
3.2 转换操作符
transform
它可以用来模仿简单的转换,比如 map 和 filter,也可以实现更复杂的转换。 使用transform,我们可以发出任意次数的任意值。??来啦
runBlocking {
(1..3).asFlow()
.transform {
emit("Making request $it")
delay(1000)
emit("response $it")
}
.collect {
println(it)
}
}
执行结果:
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
3.3 限长操作符
take
在流达到take设置的限制时会将它的执行取消。协程中的取消总是通过抛出异常来执行,所以需要加上try...catch
runBlocking {
flow {
try {
emit(1)
emit(2)
emit(3)
} finally {
println("Finally in numbers")
}
}.take(2)
.collect { println(it) }
}
执行结果:
1
2
Finally in numbers
3.4 末端流操作符
末端流操作符就是下游调用的操作符。比如collect
toList
会把数据消费到转成List
runBlocking {
println((1..9).asFlow().filter { it % 2 == 0 }.toList())
}
执行结果:
[2, 4, 6, 8]
toSet和toList类似
frist
获取第一个元素。如果流为空,则抛出 NoSuchElementException。
runBlocking {
println((1..9).asFlow().filter { it % 2 == 0 }.first())
}
执行结果:
2
reduce
reduce的第一个参数是上次操作的结果,第二个参数是当前需要传入的值。它的返回值类型必须和集合的元素类型相符
runBlocking {
val sum = (1..5).asFlow()
.map { it * it }
.reduce { a, b ->
println("a->$a b->$b")
a + b
}
println("sum->$sum")
}
执行结果:
a->1 b->4
a->5 b->9
a->14 b->16
a->30 b->25
sum->55
fold
fold的返回值类型则不受约束。
runBlocking {
val numbers = listOf(1, 1, 1)
val result = numbers.fold(StringBuilder()) { str: StringBuilder, i: Int ->
str.append(i).append(" ")
}
println("foldResult=$result")
}
执行结果:
foldResult=1 1 1
onEach
上游的每个值向下游发出之前调用onEach操作的流。
runBlocking {
(1..3).asFlow().onEach {
println("onEach->$it")
}.collect {
println("collect->$it")
}
}
执行结果:
onEach->1
collect->1
onEach->2
collect->2
onEach->3
collect->3
3.5 展平流
展平流就是处理Flow<Flow<T>>这种流包含流的这种情况,让它通过不同的方式展开铺平。
flatMapConcat
串行处理数据,展开合并成一个流
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
fun main() = runBlocking<Unit> {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapConcat { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
执行结果:
1: First at 172 ms from start
1: Second at 688 ms from start
2: First at 797 ms from start
2: Second at 1312 ms from start
3: First at 1422 ms from start
3: Second at 1937 ms from start
这个相当于把两个流链接起来了,相当于串联。
flatMapMerge
并发收集所有传入的流,并将它们的值合并到一个单独的流,以便尽快的发射值。
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First")
delay(500) // 等待 500 毫秒
emit("$i: Second")
}
runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..6).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapMerge { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
执行结果:
1: First at 265 ms from start
2: First at 359 ms from start
3: First at 469 ms from start
4: First at 578 ms from start
5: First at 687 ms from start
1: Second at 781 ms from start
6: First at 796 ms from start
2: Second at 875 ms from start
3: Second at 984 ms from start
4: Second at 1093 ms from start
5: Second at 1203 ms from start
6: Second at 1312 ms from start
flatMapLatest
与 collectLatest操作符类似,也有相对应的“最新”展平模式,在发出新流后立即取消先前流的收集。
runBlocking {
val startTime = System.currentTimeMillis() // 记录开始时间
(1..3).asFlow().onEach { delay(100) } // 每 100 毫秒发射一个数字
.flatMapLatest { requestFlow(it) }
.collect { value -> // 收集并打印
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
执行结果:
1: First at 220 ms from start
2: First at 345 ms from start
3: First at 454 ms from start
3: Second at 957 ms from start
四、Flow 背压处理
什么是背压?就是在生产者的生产速率高于消费者的处理速率的情况下出现,发射的量大于消费的量,造成了阻塞,就相当于压力往回走,这就是背压。
Kotlin协程支持背压。Kotlin流程设计中的所有函数都标有suspend修饰符-具有在不阻塞线程的情况下挂起调用程序执行的强大功能。因此,当流的收集器不堪重负时,它可以简单地挂起发射器,并在准备好接受更多元素时稍后将其恢复。
4.1 buffer操作符
buffer参数 | 处理策略 |
---|---|
BufferOverflow.SUSPEND | 设置缓冲区,如果溢出了,则将当前协程挂起,直到有消费了缓冲区中的数据 |
BufferOverflow.DROP_LATEST | 设置缓冲区,如果溢出了,丢弃最新的数据 |
BufferOverflow.DROP_OLDEST | 设置缓冲区,如果溢出了,丢弃最老的数据 |
4.1.1 采用BufferOverflow.SUSPEND
suspend fun flowBackpressureBuffer(overflow: BufferOverflow) {
fun currTime() = System.currentTimeMillis()
var start: Long = 0
val time = measureTimeMillis {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("emit $it (${currTime() - start}ms)")
}
.buffer(capacity = 2, overflow)
.collect {
println("collect $it starts (${currTime() - start}ms) ")
delay(500)
println("collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
执行结果:
emit 1 (115ms)
collect 1 starts (121ms)
emit 2 (234ms)
emit 3 (343ms)
emit 4 (452ms)
collect 1 ends (621ms)
collect 2 starts (621ms)
emit 5 (732ms)
collect 2 ends (1136ms)
collect 3 starts (1136ms)
collect 3 ends (1653ms)
collect 4 starts (1653ms)
collect 4 ends (2156ms)
collect 5 starts (2156ms)
collect 5 ends (2668ms)
Cost 2775 ms
可以看到“emit 4 (452ms)”发射后,发现缓冲区满了,所以就挂起了,当开始“collect 2 starts (621ms) ”的时候,此时 3 和 4 存储在了缓冲区内,此时发射第5个。因为 buffer 的容量是从 0 开始计算的。
4.1.2 采用BufferOverflow.DROP_LATEST
还是上面的代码,只是buffer操作符的第二个参数变成了BufferOverflow.DROP_LATEST 。
执行结果如下:
emit 1 (121ms)
collect 1 starts (126ms)
emit 2 (227ms)
emit 3 (337ms)
emit 4 (448ms)
emit 5 (557ms)
collect 1 ends (636ms)
collect 2 starts (636ms)
collect 2 ends (1137ms)
collect 3 starts (1137ms)
collect 3 ends (1651ms)
Cost 1761 ms
从日志可以看到虽然发射了五个,但是丢弃了后面的4和5。
4.1.3 采用BufferOverflow.DROP_OLDEST
还是上面的代码,只是buffer操作符的第二个参数变成了BufferOverflow.DROP_OLDEST 。
执行结果如下:
emit 1 (117ms)
collect 1 starts (121ms)
emit 2 (223ms)
emit 3 (332ms)
emit 4 (440ms)
emit 5 (550ms)
collect 1 ends (628ms)
collect 4 starts (628ms)
collect 4 ends (1139ms)
collect 5 starts (1139ms)
collect 5 ends (1653ms)
Cost 1749 ms
从日志可以看到虽然发射了五个,但是丢弃了前面的 2和3。
4.2 conflate操作符
confilate 操作符是不设缓冲区,丢弃旧数据。
suspend fun flowBackpressureBuffer(overflow: BufferOverflow) {
fun currTime() = System.currentTimeMillis()
var start: Long = 0
val time = measureTimeMillis {
(1..5).asFlow()
.onStart { start = currTime() }
.onEach {
delay(100)
println("emit $it (${currTime() - start}ms)")
}
.conflate()
.collect {
println("collect $it starts (${currTime() - start}ms) ")
delay(500)
println("collect $it ends (${currTime() - start}ms) ")
}
}
println("Cost $time ms")
}
执行结果如下:
emit 1 (116ms)
collect 1 starts (121ms)
emit 2 (222ms)
emit 3 (329ms)
emit 4 (437ms)
emit 5 (546ms)
collect 1 ends (624ms)
collect 5 starts (624ms)
collect 5 ends (1138ms)
Cost 1252 ms
conflate 操作符是不设缓冲区,也就是缓冲区大小为 0,丢弃旧数据,也就是采取 DROP_OLDEST 策略,那么相当于 buffer(0, BufferOverflow.DROP_OLDEST) 。
看一下源码:
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
public fun <T> Flow<T>.buffer(capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND): Flow<T> {
require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) {
"Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity"
}
require(capacity != CONFLATED || onBufferOverflow == BufferOverflow.SUSPEND) {
"CONFLATED capacity cannot be used with non-default onBufferOverflow"
}
// desugar CONFLATED capacity to (0, DROP_OLDEST)
var capacity = capacity
var onBufferOverflow = onBufferOverflow
**if (capacity == CONFLATED) {
capacity = 0
onBufferOverflow = BufferOverflow.DROP_OLDEST
}**
// create a flow
return when (this) {
is FusibleFlow -> fuse(capacity = capacity, onBufferOverflow = onBufferOverflow)
else -> ChannelFlowOperatorImpl(this, capacity = capacity, onBufferOverflow = onBufferOverflow)
}
}
从源码可以看出,conflate 是 buffer(0, BufferOverflow.DROP_OLDEST) 的一种快捷方式。
4.3 collectLatest操作符
这个操作符只接受最后一个发射。
runBlocking {
flow<Int> {
(1..3).forEach {
delay(100)
emit(it)
println("emit->$it")
}
}.collectLatest {
delay(800)
println("collectLatest->$it")
}
}
执行结果如下:
emit->1
emit->2
emit->3
collectLatest->3
五、StateFlow 和 SharedFlow
5.1 StateFlow
StateFlow 和 LiveData 差不多,都是可观察的数据容器。在 StateFlow 中任何数据的发送,它的每一个接收器都能接收到。
和 LiveData 不同的是, LiveData 不需要初始值,但 StateFlow 需要。
LiveData 会与 Activity 绑定,当 View 进入 STOPED 状态时, LiveData.observer() 会自动取消注册,而从 StateFlow 或任意其他数据流收集数据的操作并不会停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集数据流。
StateFlow 是热流,并不是冷流。并且 StateFlow 的 collect 收不到调用之前发射的数据。
val state = MutableStateFlow(1)
runBlocking {
coroutineScope {
launch {
for (i in 0..10) {
state.emit(i)
delay(1000)
}
}
launch {
delay(2100)
state.collect {
println("receive state $it")
}
}
}
}
执行结果如下:
receive state 2
receive state 3
receive state 4
receive state 5
receive state 6
receive state 7
receive state 8
receive state 9
receive state 10
因为是延时2.1s,所以最后是从 2 开始接收的。
StateFlow 分为 StateFlow 和 MutableStateFlow 。就像 LiveData 和 MutableLiveData 一样。 StateFlow 只能接收数据,不能发送数据,而 MutableStateFlow 即可以发送也可以接收。
5.2 SharedFlow
SharedFlow 和 StateFlow 相比,他有缓冲区区,并可以定义缓冲区的溢出规则,已经可以定义给一个新的接收器发送多少数据的缓存值。MutableSharedFlow 不需要初始值。
当你有如下场景时,需要使用 SharedFlow:
- 发生订阅时,需要将过去已经更新的n个值,同步给新的订阅者。
- 配置缓存策略。
MutableSharedFlow 的参数如下:
- replay 当新的订阅者Collect时,发送几个已经发送过的数据给它
- extraBufferCapacity 减去replay,MutableSharedFlow还缓存多少数据
- onBufferOverflow 缓存策略
- SUSPEND 挂起
- DROP_OLDEST 丢掉最旧值
- DROP_LATEST 丢掉最新值
上代码,
suspend fun simpleSharedFlow() {
val sharedFlow = MutableSharedFlow<Int>(
replay = 5,
extraBufferCapacity = 3,
)
coroutineScope {
launch {
sharedFlow.collect {
println("collect1 received shared flow $it")
}
}
launch {
(1..10).forEach {
sharedFlow.emit(it)
delay(100)
}
}
// wait a minute
delay(1000)
launch {
sharedFlow.collect {
println("collect2 received shared flow $it")
}
}
}
}
执行结果如下:
collect1 received shared flow 1
collect1 received shared flow 2
collect1 received shared flow 3
collect1 received shared flow 4
collect1 received shared flow 5
collect1 received shared flow 6
collect1 received shared flow 7
collect1 received shared flow 8
collect1 received shared flow 9
collect1 received shared flow 10
collect2 received shared flow 6
collect2 received shared flow 7
collect2 received shared flow 8
collect2 received shared flow 9
collect2 received shared flow 10
因为replay = 5,所以collect2收到了5个已经发送的数据。