Kotlin Flow 三 StateFlow 和 SharedFlow

StateFlow

StateFlow 和 LiveData 差不多,都是可观察的数据容器。在 StateFlow 中任何数据的发送,它的每一个接收器都能接收到。在 StateFlow 和 SharedFlow 中收集器也可以被称为订阅者,不过这个订阅者会挂起当前协程,而且永远不会结束。

private val state = MutableStateFlow(1)

suspend fun simpleStateFlow() {
    coroutineScope {
        launch {
            delay(1000)
            state.collect {
                println("before state value $it")
            }
        }
        launch {
            for (i in 1..100) {
                state.emit(i)
                delay(100)
            }
        }

        launch {
            state.collect {
                println("state value $it")
            }
        }
    }
}

需要注意的是 collect 是一个挂起函数,所以一旦调用 collect 协程就会被挂起,所以上述的例子中在一个协程中发送数据,在两个协程中接收数据。

LiveData 不同的在于, LiveData 不需要初始值,但 StateFlow 需要。

LiveData 会与 Activity 绑定,当 View 进入 STOPED 状态时, LiveData.observer() 会自动取消注册,而从 StateFlow 或任意其他数据流收集数据的操作并不会停止。如需实现相同的行为,需要从 Lifecycle.repeatOnLifecycle 块收集数据流。

StateFlow热流,并不是冷流。并且 StateFlowcollect 收不到调用之前发射的数据。

val state = MutableStateFlow(1)
coroutineScope {
    launch {
        for (i in 0..10) {
            state.emit(i)
            delay(1000)
        }
    }
                                        
    launch {
        delay(2000)
        state.collect {
            println("receive state $it")
        }
    }
}

可以看到最终的结果是:

receive state 2
receive state 3
receive state 4
receive state 5
receive state 6
receive state 7
receive state 8
receive state 9
receive state 10

因为在接受之前 delay 了 2s,所以最后是从 2 开始接收的。

把普通的 Flow 转化成 StateFlow 。

val flow = flow {
    for (i in 0..4) {
        emit(i)
        delay(100)
    }
}
coroutineScope {
    val stateFlow = flow.stateIn(this)
    launch {
        stateFlow.collect {
            println("receive flow.stateIn value $it")
        }
    }
}

我们同样可以像 LiveData 一样直接获取它的值。

stateFlow.value

StateFlow 分为 StateFlowMutableStateFlow 。就像 LiveDataMutableLiveData 一样。 StateFlow 只能接收数据,不能发送数据,而 MutableStateFlow 即可以发送也可以接收。

private suspend fun simpleStateFlowAndMutableStateFlow() {
    val mutableStateFlow = MutableStateFlow(1)
    coroutineScope {
        launch {
            collectData(mutableStateFlow.asStateFlow())
        }
        launch {
            (1..10).forEach {
                delay(100)
                mutableStateFlow.emit(it)
            }
        }
    }
}

如上代码所述,可以将 MutableStateFlow 通过 asStateFlow 转换成 StateFlow 。

StateFlow 中给我们提供了一个协程安全的并发修改 StateFlow 中的值的方法 compareAndSet 。该方法能够保证原子的修改 StateFlow 的值。该方法是通过 CAS 来修改值。

public fun compareAndSet(expect: T, update: T): Boolean

将当前的值和期待的值进行比较,如果相等则更新当前的值,并返回 true,如果不相等则返回 false。这里的比较并修改是原子性的。

SharedFlow

SharedFlowStateFlow 相比,他有缓冲区区,并可以定义缓冲区的溢出规则,已经可以定义给一个新的接收器发送多少数据的缓存值。

SharedFlow 同样有与之对应的 MutableSharedFlowMutableSharedFlow 的参数如下:

  • replay 给一个新的订阅者发送的缓冲区的数量。
  • extraBufferCapacity 除了 replay 的数量之外的缓冲区的大小。
  • onBufferOverflow 缓冲区溢出规则
    • SUSPEND 挂起
    • DROP_OLDEST 移除旧的值
    • DROP_LATEST 移除新的值

SharedFlow 的缓冲区大于是 replay + extraBufferCapacity 。

注意相比于 MutableStateFlow , MutableSharedFlow 不需要初始值。

suspend fun simpleSharedFlow() {
    val sharedFlow = MutableSharedFlow<Int>(
        replay = 5,
        extraBufferCapacity = 3,
    )
    coroutineScope {
        launch {
            sharedFlow.collect {
                println("collect1 received shared flow $it")
            }
        }
        launch {
            (1..10).forEach {
                sharedFlow.emit(it)
                delay(100)
            }
        }
        // wait a minute
        delay(1000)
        launch {
            sharedFlow.collect {
                println("collect2 received shared flow $it")
            }
        }
    }
}

同样的,我们可以把普通的 Flow 转换成 SharedFlow。

suspend fun simpleConvertToSharedFlow(started: SharingStarted) {
    var start = 0L
    // create normal flow
    val flow = (1..10).asFlow()
        .onStart { start = currTime() }
        .onEach {
            println("Emit $it ${currTime() - start}ms")
            delay(100)
        }
    // convert to shared flow
    // need coroutine scope
    coroutineScope {
        val sharedFlow = flow.shareIn(this, started, replay = 2)
        delay(400)
        launch {
            println("current time ")
            sharedFlow.collect {
                println("received convert shared flow $it at ${currTime() - start}ms")
            }
        }
    }
}

这里的转换有些复杂,可以看到我们通过 shareIn 可以将普通的 flow 转换成 SharedFlow ??梢钥吹?sharedIn 有三个参数:

  • CoroutineScope - sharing 的协程的作用域。
  • SharingStarted - 启动模式
    • Eagerly 迫切的,渴望的,在转换完成后立即开始 sharing 数据,当上游的数据超过 replay 的时候,前面的数据就会被丢弃,相当于 DROP_OLDEST
    • Lazily 当有第一个订阅者(调用 collect)的时候开始发射数据。
    • WhileSubscribed 当第一个订阅者出现的时候立即开始,当最后一个订阅者消失的时立即停止(默认情况下),replay 数量的缓存值将永远保留(默认情况下)。这是一个函数,可以通过参数来控制当最后一个订阅者消失时的行为,以及缓存的有效期。
      • stopTimeoutMillis - 配置最后一个订阅者消失后 sharing flow 停止的延时。
      • replayExpirationMillis - 配置 sharing flow 协程的停止和重置缓冲区之间的间隔,单位是毫秒,默认值为 Long.MAX_VALUE 缓存永远都不重置,0 表示立即重置缓存。比较难懂可以看看下面的例子。
  • replay 当订阅的时候回复的数量。

如果上面的函数中传递的是 Eagerly ,其输出如下:

Emit 1 2ms
Emit 2 109ms
Emit 3 213ms
Emit 4 313ms
current time 
received convert shared flow 2 at 412ms
received convert shared flow 3 at 412ms
Emit 5 413ms
received convert shared flow 4 at 414ms
Emit 6 518ms
received convert shared flow 5 at 519ms
Emit 7 619ms
received convert shared flow 6 at 619ms
Emit 8 720ms
received convert shared flow 7 at 720ms
Emit 9 822ms
received convert shared flow 8 at 823ms
Emit 10 926ms
received convert shared flow 9 at 926ms
received convert shared flow 10 at 1027ms

如果传入的是 Lazily ,其输入如下:

current time 
Emit 1 2ms
Emit 2 105ms
received convert shared flow 1 at 106ms
Emit 3 209ms
received convert shared flow 2 at 209ms
Emit 4 313ms
received convert shared flow 3 at 313ms
Emit 5 415ms
received convert shared flow 4 at 415ms
Emit 6 518ms
received convert shared flow 5 at 518ms
Emit 7 622ms
received convert shared flow 6 at 622ms
Emit 8 725ms
received convert shared flow 7 at 725ms
Emit 9 826ms
received convert shared flow 8 at 826ms
Emit 10 932ms
received convert shared flow 9 at 932ms
received convert shared flow 10 at 1032ms

很明显能够看出两者的区别。

下面看看 WhileSubscribed ,这种方式非常灵活。

fun currTime() = System.currentTimeMillis()

suspend fun simpleConvertToSharedFlow(started: SharingStarted) {
    var start = 0L
    // create normal flow
    val flow = (1..10).asFlow()
        .onStart { start = currTime() }
        .onEach {
            println("Emit $it ${currTime() - start}ms")
            delay(100)
        }
    // convert to shared flow
    // need coroutine scope
    coroutineScope {
        val sharedFlow = flow.shareIn(this, started, replay = 2)
        val job = launch {
            println("current time ")
            sharedFlow.collect {
                println("received convert shared flow $it at ${currTime() - start}ms")
            }
        }

        launch {
            delay(1000L)
            job.cancel()
            delay(110L)
            sharedFlow.collect {
                println("received again shared flow $it")
            }
            println("shared flow has stop")
        }
    }
}

@OptIn(ExperimentalTime::class)
suspend fun main() {
//    simpleSharedFlow()
    simpleConvertToSharedFlow(
        SharingStarted.WhileSubscribed(
            stopTimeout = 100L.toDuration(DurationUnit.MILLISECONDS),
            replayExpiration = 200L.toDuration(DurationUnit.MILLISECONDS)
        )
    )
}

这里配置当最后一个订阅者消失时 delay 100ms 后停止 sharing flow,在 sharing flow 停止后 200ms 后让缓存失效。这里可以通过调整 job.cancel 后的 delay 函数的时长来看看效果。当时间为 110ms 时,会重新接受到缓存 9 和 10,并重新开始 sharing flow,如果参数调整为 320ms 时,缓存会失效,会直接重新开始 sharing flow。

110ms 的结果:

current time 
Emit 1 1ms
Emit 2 107ms
received convert shared flow 1 at 108ms
Emit 3 211ms
received convert shared flow 2 at 211ms
Emit 4 315ms
received convert shared flow 3 at 315ms
Emit 5 417ms
received convert shared flow 4 at 417ms
Emit 6 521ms
received convert shared flow 5 at 521ms
Emit 7 623ms
received convert shared flow 6 at 624ms
Emit 8 727ms
received convert shared flow 7 at 727ms
Emit 9 829ms
received convert shared flow 8 at 829ms
Emit 10 933ms
received convert shared flow 9 at 933ms
received again shared flow 9
received again shared flow 10
Emit 1 0ms
Emit 2 105ms
received again shared flow 1
Emit 3 210ms
received again shared flow 2
Emit 4 314ms
received again shared flow 3
Emit 5 415ms
received again shared flow 4
Emit 6 519ms
received again shared flow 5
Emit 7 620ms
received again shared flow 6
Emit 8 721ms
received again shared flow 7
Emit 9 826ms
received again shared flow 8
Emit 10 927ms
received again shared flow 9
received again shared flow 10

320ms 的结果:

current time 
Emit 1 1ms
Emit 2 106ms
received convert shared flow 1 at 106ms
Emit 3 210ms
received convert shared flow 2 at 210ms
Emit 4 314ms
received convert shared flow 3 at 314ms
Emit 5 414ms
received convert shared flow 4 at 414ms
Emit 6 517ms
received convert shared flow 5 at 517ms
Emit 7 623ms
received convert shared flow 6 at 623ms
Emit 8 726ms
received convert shared flow 7 at 727ms
Emit 9 827ms
received convert shared flow 8 at 827ms
Emit 10 931ms
received convert shared flow 9 at 931ms
Emit 1 0ms
Emit 2 105ms
received again shared flow 1
Emit 3 209ms
received again shared flow 2
Emit 4 315ms
received again shared flow 3
Emit 5 418ms
received again shared flow 4
Emit 6 523ms
received again shared flow 5
Emit 7 627ms
received again shared flow 6
Emit 8 732ms
received again shared flow 7
Emit 9 833ms
received again shared flow 8
Emit 10 937ms
received again shared flow 9
received again shared flow 10

我们看下面这段源码就会很快明白:

override fun command(subscriptionCount: StateFlow<Int>): Flow<SharingCommand> = subscriptionCount
    .transformLatest { count ->
        if (count > 0) {
            emit(SharingCommand.START)
        } else {
            delay(stopTimeout)
            if (replayExpiration > 0) {
                emit(SharingCommand.STOP)
                delay(replayExpiration)
            }
            emit(SharingCommand.STOP_AND_RESET_REPLAY_CACHE)
        }
    }
    .dropWhile { it != SharingCommand.START } // don't emit any STOP/RESET_BUFFER to start with, only START
    .distinctUntilChanged() // just in case somebody forgets it, don't leak our multiple sending of START
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容