Flow 监听:36. launchIn 并发监听

本篇解析 example-flow-36.kt。学习如何在不阻塞当前执行流的情况下收集 Flow。

1. 核心操作符:launchIn

launchIn(scope) 是一个末端操作符,它会启动一个新的协程来收集流。

特点:

  • 非阻塞:它不会挂起当前的协程。调用它之后,代码会立即向下执行。
  • 作用域绑定:它需要一个 CoroutineScope 参数,流的生命周期会与这个作用域绑定。

2. 代码解析

1
2
3
4
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // 在当前作用域启动新协程收集
println("Done") // 这行会立即执行
  • 结果分析:你会先看到 “Done”,然后再看到 “Event: 1”, “Event: 2” 等日志。

3. 开发者感悟

launchIn 是 Android 开发中最常用的流收集方式。在 ActivityFragment 中,我们通常使用 lifecycleScopeviewLifecycleOwner.lifecycleScope 作为参数。它能确保在不阻塞主线程的同时,自动在界面销毁时停止监听,防止内存泄漏。

Flow 取消:37. 构建器中的取消检查

本篇解析 example-flow-37.kt。探讨 flow { ... } 构建器对取消的默认支持。

1. 核心特性:自动检查 ensureActive

为了开发者的便利,flow { ... } 构建器在每次发射值(emit)时,都会自动执行一次 ensureActive() 检测。

代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
fun foo(): Flow<Int> = flow { 
for (i in 1..5) {
println("Emitting $i")
emit(i) // 这里会检查协程是否已被取消
}
}

fun main() = runBlocking {
foo().collect { value ->
if (value == 3) cancel() // 收集到 3 时取消协程
println(value)
}
}
  • 结果分析:当发射器尝试发射 4 时,emit 内部检测到协程已取消,会抛出 CancellationException。因此 4 和 5 永远不会被发射。

2. 开发者感悟

这是一个非常人性化的设计。它意味着即使你在 flow { ... } 里写了一个繁忙的计算循环,只要你还在不断地 emit,这个流就是可以被外部及时取消的。

Flow 取消:38. 操作符中的取消检查

本篇解析 example-flow-38.kt。探讨 Flow 在性能优先下的取消行为。

1. 核心问题:性能优于检查

出于性能原因,大多数 Flow 操作符(如 asFlow, map, filter不会自行执行额外的取消检测。

代码解析

1
2
3
4
5
6
fun main() = runBlocking {
(1..5).asFlow().collect { value ->
if (value == 3) cancel() // 收集到 3 时请求取消
println(value)
}
}
  • 结果分析:在这个例子中,即使在 3 之后调用了 cancel(),4 和 5 依然会被打印出来。
  • 原因asFlow() 只是简单地循环发射值,并没有像 flow { ... } 那样在 emit 之前检查 ensureActive()

2. 开发者感悟

这是一个非常关键的细节。它提醒我们:当你使用便捷构建器(如 asFlow)处理一个密集的循环时,如果中途被取消了,流可能并不会立即停止,而是会“惯性”地跑完剩余任务。

Flow 取消:39. 使用 cancellable 启用取消检查

本篇解析 example-flow-39.kt。学习如何让原本无法取消的流变为可取消。

1. 核心操作符:cancellable()

cancellable() 操作符会为流的每次发射添加一个 ensureActive() 检查。

代码解析

1
2
3
4
5
6
7
8
fun main() = runBlocking {
(1..5).asFlow()
.cancellable() // 手动开启取消检测
.collect { value ->
if (value == 3) cancel()
println(value)
}
}
  • 结果分析:与示例 38 不同,开启 cancellable() 后,当收集到 3 并调用 cancel() 时,流会在发射 4 之前检测到取消,并立即停止。

2. 开发者感悟

cancellable 是一个“安全补丁”。当你使用的流构建器(如 asFlow)没有内置取消检测,且你的处理逻辑又非常繁忙且需要响应取消时,请务必加上它。


附录:Flow 系列总结

恭喜你!到这里你已经完成了 Flow 的全部核心教程。

Flow 处理黄金法则:

  1. 冷流原则:不收集,不运行。
  2. 异步流:在 flow { ... } 中随意挂起,不阻塞线程。
  3. 线程调度:使用 flowOn 改变上游,保持下游(collect)与调用者一致。
  4. 性能优化:用 buffer 并发,用 conflatecollectLatest 处理过快数据。
  5. 异常安全:用 catch 优雅处理错误,用 onCompletionfinally 释放资源。
  6. 生命周期:在 Android 中配合 LifecycleScopelaunchIn 监听数据,实现自动取消。

01/80 统计素数个数-暴力算法

题目描述

给定一个整数数组,统计其中素数(质数)的个数。
素数定义:在一个大于 1 的自然数中,除了 1 和它本身以外不再有其他因数的自然数。


核心思路

1. 暴力判定法

对于每一个数字 n,我们从 2 开始尝试除到 n-1。如果期间能被整除,说明不是素数。

2. 优化逻辑:$\sqrt{n}$ 法

其实我们不需要一直除到 n-1

  • 如果 $n = a \times b$,那么 $a$ 和 $b$ 中至少有一个会小于或等于 $\sqrt{n}$。
  • 例如判断 16 是否为素数,我们只需要检查 2、3、4。因为如果 4 以后还有因子(如 8),那么必然对应一个小于 4 的因子(如 2),而我们在检查到 2 时就已经发现了。
  • 结论:只需要遍历到 sqrt(n) 即可将单次判定的复杂度从 $O(n)$ 降低到 $O(\sqrt{n})$。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 判断一个数是否为素数
*/
fun isPrime(n: Int): Boolean {
// 小于等于 1 的数不是素数
if (n <= 1) return false

// 基础优化:从 2 遍历到 sqrt(n)
var i = 2
while (i * i <= n) {
if (n % i == 0) {
return false
}
i++
}
return true
}

/**
* 统计数组中素数的个数
*/
fun countPrimesInArray(arr: IntArray): Int {
var count = 0
for (num in arr) {
if (isPrime(num)) {
count++
}
}
return count
}

fun main() {
val arr = intArrayOf(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
// 预期结果:2, 3, 5, 7, 11, 13 共 6 个
println("数组中素数的个数为: ${countPrimesInArray(arr)}")
}

复杂度分析

  • 时间复杂度:$O(N \cdot \sqrt{M})$。
    • 其中 $N$ 为数组长度,$M$ 为数组中数值的大小。
    • 对于数组中的每个元素,我们执行一次判定,判定耗时 $O(\sqrt{M})$。
  • 空间复杂度:$O(1)$。只使用了常数个变量进行计数和迭代。

进阶思考:如果要统计 0 到 N 之间的所有素数?

对于大规模范围内的素数统计(例如统计 100 万以内的素数个数),暴力算法会显得力不从心。此时可以采用 埃拉托斯特尼筛法(Sieve of Eratosthenes)

  1. 建立一个布尔数组 isPrime,初始全部设为 true
  2. 从 2 开始,如果是素数,则将其所有的倍数(2x, 3x…)标记为 false
  3. 这样每个合数只会被标记几次,时间复杂度可以大幅优化至 $O(N \log \log N)$。

02/80 统计素数个数-埃氏筛选法

题目描述

给定一个正整数 $n$ 或一个整数数组,统计其中素数的个数。
对于大规模范围内的素数统计,暴力判定法($O(N\sqrt{N})$)效率较低。本篇介绍经典的 埃拉托斯特尼筛法(Sieve of Eratosthenes)


核心思路

埃氏筛法的核心思想是:“素数的倍数一定是合数”

  1. 初始化:建立一个长度为 max + 1 的布尔数组 isPrime,初始全部设为 true。将 01 设为 false
  2. 筛选:从 2 开始遍历到 sqrt(max)
    • 如果当前数字 i 是素数,则将其所有的倍数($2i, 3i, 4i \dots$)标记为 false(非素数)。
  3. 优化点
    • 从 $i^2$ 开始标记:在筛选素数 i 的倍数时,小于 $i^2$ 的倍数(如 $2i, 3i \dots$)其实已经被之前的素数($2, 3 \dots$)标记过了。因此直接从 $i \times i$ 开始标记可以减少重复操作。
    • 遍历边界:只需遍历到 sqrt(max) 即可完成所有合数的标记。

代码实现 (Kotlin)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import kotlin.math.sqrt

/**
* 使用埃氏筛法统计数组中的素数个数
*/
fun countPrimesInArray(arr: IntArray): Int {
if (arr.isEmpty()) return 0

// 1. 找到数组中的最大值,确定筛的大小
val maxVal = arr.maxOrNull() ?: 0
if (maxVal < 2) return 0

// 2. 预处理:构建埃氏筛
val isPrime = BooleanArray(maxVal + 1) { true }
isPrime[0] = false
isPrime[1] = false

val limit = sqrt(maxVal.toDouble()).toInt()
for (i in 2..limit) {
if (isPrime[i]) {
// 从 i*i 开始标记,步长为 i
for (j in i * i..maxVal step i) {
isPrime[j] = false
}
}
}

// 3. 查表统计结果
var count = 0
for (num in arr) {
if (num >= 0 && isPrime[num]) {
count++
}
}

return count
}

fun main() {
val arr = intArrayOf(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
// 预期结果:2, 3, 5, 7, 11, 13 共 6 个
println("数组中素数的个数为: ${countPrimesInArray(arr)}")
}

复杂度分析

  • 时间复杂度:$O(M \log \log M + N)$。
    • $M$ 是数组中的最大值。构建筛子的复杂度是数学上证明的 $O(M \log \log M)$,接近线性。
    • $N$ 是数组长度,用于最后的遍历统计。
  • 空间复杂度:$O(M)$。需要一个长度为 $M+1$ 的布尔数组来存储筛选状态。

算法对比与总结

特性 暴力判定法 埃氏筛法
时间复杂度 $O(N \cdot \sqrt{M})$ $O(M \log \log M)$
空间复杂度 $O(1)$ $O(M)$
适用场景 数组元素较少,但数值极大 数组元素多,或需要频繁查询某一范围内的素数

技巧提示:如果题目是要求“统计 $n$ 以内的素数个数”,埃氏筛法是面试中的标准答案。如果是针对“数组中”的素数且数组很大,先求最大值并建筛(查表法)是最高效的工业实践。

03/80 删除排序数组中的重复项

题目描述

给你一个 升序排列 的数组 nums ,请你 原地 删除重复出现的元素,使每个元素 只出现一次 ,返回删除后数组的新长度。元素的 相对顺序 应该保持一致。

由于在某些语言中不能改变数组的长度,所以必须将结果放在数组 nums 的第一部分。更具体地,如果在删除重复项后有 k 个元素,那么 nums 的前 k 个元素应该保存最终结果。

注意: 不要使用额外的空间,你必须在 原地修改输入数组 并在使用 $O(1)$ 额外空间的条件下完成。


核心思路:快慢指针

这道题是“双指针”技巧中最经典的应用场景之一。

  1. 快慢指针定义
    • **慢指针 slow**:指向当前已确定的不重复序列的最后一个位置。
    • **快指针 fast**:遍历整个数组,寻找下一个不重复的元素。
  2. 移动逻辑
    • 初始时,slow 指向索引 0
    • fast 从索引 1 开始向后遍历。
    • nums[fast] != nums[slow] 时,说明找到了一个新元素。此时:
      • slow 向后移一位(slow++)。
      • nums[fast] 的值赋给 nums[slow]
  3. 最终结果
    • 由于 slow 是索引,最终不重复数组的长度就是 slow + 1

代码实现 (Kotlin)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class Solution {
/**
* 删除排序数组中的重复项
* @param nums 升序排列的整数数组
* @return 剔除重复项后数组的新长度
*/
fun removeDuplicates(nums: IntArray): Int {
// 1. 边界处理:若数组为空,长度为 0
if (nums.isEmpty()) return 0

// 2. 初始化慢指针
var slow = 0

// 3. 快指针 fast 遍历数组
for (fast in 1 until nums.size) {
// 如果快指针指向的元素与慢指针不同
if (nums[fast] != nums[slow]) {
// 慢指针后移,并更新其位置的值
slow++
nums[slow] = nums[fast]
}
}

// 4. 返回长度(索引 + 1)
return slow + 1
}
}

fun main() {
val solution = Solution()
val nums = intArrayOf(0, 0, 1, 1, 1, 2, 2, 3, 3, 4)
val length = solution.removeDuplicates(nums)

println("处理后的数组长度: $length")
print("数组内容: ")
for (i in 0 until length) {
print("${nums[i]} ")
}
}

复杂度分析

  • 时间复杂度:$O(N)$。其中 $N$ 是数组的长度。快指针 fast 遍历一次数组即可完成。
  • 空间复杂度:$O(1)$。我们只使用了常数级别的额外空间(两个指针变量),完全在原地进行修改。

关键点总结

  1. **原地修改 (In-place)**:题目明确要求不能创建新数组,因此双指针覆盖原有位置是唯一高效的做法。
  2. 升序前提:因为数组是有序的,所以重复的元素必然是相邻的。这使得我们只需要比较 nums[fast]nums[slow] 即可判断是否重复。
  3. 通用扩展
    • 如果题目要求 “每个元素最多出现两次” 怎么办?
    • 逻辑只需改为:nums[fast] != nums[slow - 1] 即可实现允许重复两次的逻辑。

04/80 寻找数组的中心下标

题目描述

给你一个整数数组 nums ,请计算数组的 中心下标

数组 中心下标 是数组的一个下标,其左侧所有元素相加的和等于右侧所有元素相加的和。

如果中心下标位于数组的左端,那么左侧数之和视为 0 ,因为下标的左侧不存在元素。这一点对于中心下标位于数组右端的情况同样适用。

如果数组有多个中心下标,应该返回 最靠近左边 的那个。如果数组不存在中心下标,返回 -1


核心思路:前缀和思想

这道题最直白的解法是对每个位置都重新计算左右两侧之和,但那样复杂度会达到 $O(N^2)$。利用“前缀和”的思想,我们可以将复杂度优化到 $O(N)$。

1. 数学推导

设数组总和为 totalSum,当前下标为 i,其左侧元素之和为 leftSum,当前元素值为 nums[i]
根据题目要求,如果 i 是中心下标,则其右侧之和也必须等于 leftSum

那么整个数组的和可以表示为:
$$totalSum = leftSum + nums[i] + rightSum$$

因为 $rightSum = leftSum$,所以:
$$totalSum = leftSum + nums[i] + leftSum$$
$$totalSum = 2 \times leftSum + nums[i]$$

结论:我们只需维护一个变量 leftSum,在遍历过程中检查 2 * leftSum + nums[i] == totalSum 是否成立即可。


代码实现 (Kotlin)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class Solution {
/**
* 寻找数组的中心下标
* @param nums 整数数组
* @return 中心下标,若不存在则返回 -1
*/
fun pivotIndex(nums: IntArray): Int {
// 1. 先计算数组的总和
val totalSum = nums.sum()

// 2. 维护左侧元素之和
var leftSum = 0

// 3. 遍历数组寻找满足条件的下标
for (i in nums.indices) {
val currentValue = nums[i]

// 检查:2 * 左侧和 + 当前值 == 总和
// 这等价于:左侧和 == 总和 - 左侧和 - 当前值(即右侧和)
if (2 * leftSum + currentValue == totalSum) {
return i
}

// 更新左侧和,供下一个位置使用
leftSum += currentValue
}

return -1
}
}

fun main() {
val solution = Solution()
val nums = intArrayOf(1, 7, 3, 6, 5, 6)
// 预期输出:3
// 解释:左侧和 (1+7+3=11),右侧和 (5+6=11)
println("中心下标为: ${solution.pivotIndex(nums)}")
}

复杂度分析

  • 时间复杂度:$O(N)$。其中 $N$ 是数组的长度。我们首先进行了一次 $O(N)$ 的求和,然后又进行了一次 $O(N)$ 的遍历。
  • 空间复杂度:$O(1)$。我们只使用了常数级别的额外空间(totalSumleftSum 变量)。

关键点总结

  1. 空间优化:不需要像传统的“前缀和”问题那样创建一个 $O(N)$ 长度的辅助数组,仅通过两个变量动态维护状态即可。
  2. 边界处理
    • i = 0 时,leftSum 初始为 0。如果 0 + nums[0] + 0 == totalSum,说明第一个元素就是中心下标,逻辑完美兼容。
  3. 最左侧优先:通过 for 循环从前向后遍历,一旦找到第一个满足条件的下标就立即返回,确保了结果是“最靠近左边”的。

协程核心概念:CoroutineScope 与 CoroutineContext

图片描述

1. 什么是 CoroutineScope?

CoroutineScope(协程作用域)是协程的生命周期管家。它的源码非常简单,本质上只是对 CoroutineContext 的封装:

1
2
3
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}

它的核心作用: 追踪所有在该作用域内启动的协程。通过 scope.cancel(),你可以一键关闭该作用域下的所有子协程,从而彻底避免内存泄漏。


2. 在非 LifecycleOwner 类中手动管理协程

在普通 Java/Kotlin 类中,没有现成的 lifecycleScope 可用。此时需要手动构建并销毁作用域。

代码模板:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import kotlinx.coroutines.*

class MyRepository {
// 1. 定义 Job,推荐使用 SupervisorJob
private val repositoryJob = SupervisorJob()

// 2. 构建作用域:组合调度器与 Job
private val repositoryScope = CoroutineScope(Dispatchers.Main + repositoryJob)

fun fetchData() {
// 3. 使用作用域启动协程
repositoryScope.launch {
// ...
}
}

// 4. 在类销毁时手动取消
fun clear() {
repositoryJob.cancel()
}
}

3. 深入理解 CoroutineContext

如果说 CoroutineScope 是管家,那么 CoroutineContext 就是管家手里的配置清单。它由一组元素组成,每个元素都有唯一的 Key

核心元素列表:

  • Job: 控制协程生命周期。
  • CoroutineDispatcher: 决定在哪个线程运行。
  • CoroutineName: 协程名称(调试利器)。
  • CoroutineExceptionHandler: 未捕获异常的最后一道防线。

4. 源码解析:协程的 “+” 号代表什么?

当我们写下 Dispatchers.Main + Job() 时,其实是调用了 CoroutineContextplus 操作符。

核心源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// 确保拦截器(Interceptor)永远在末尾
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}

逻辑拆解:

  1. 右侧优先:相同 Key 的元素,右边的会覆盖左边的。
  2. 拦截器置后:确保调度器永远处于链表的末尾,以实现快速查找性能。

Kotlin中的惰性操作容器——Sequence

Sequence序列

Sequence 是Kotlin标准库提供的一种容器类型。它和Iterable一样具备对集合进行多步骤操作能力,但是却是采用了一种完全不同于Iterable的实现方式:

1
2
3
4
5
6
7
8
val map = (0..3).filter {
println("filter:$it")
it % 2 == 0
}.map {
println("map:$it")
it + 1
}
println(map)

上面的代码用来演示Iterable进行连续操作的情况。它的输出如下:

1
2
3
4
5
6
7
filter:0
filter:1
filter:2
filter:3
map:0
map:2
[1, 3]

mapfilter这些链式集合函数它们都会立即执行并创建中间临时集用来保存数据。当原始数据不多时,这并不会有什么影响。但是,当原始数据量非常大的时候。这就会变的非常低效。而此时,就可以借助Sequence提高效率。

1
2
3
4
5
6
7
8
9
val sequence = (0..3).asSequence().filter {
println("filter:$it")
it % 2 == 0
}.map {
println("map:$it")
it + 1
}
println("准备开始执行")
println(sequence.toList())

上面的代码执行结果如下:

1
2
3
4
5
6
7
8
准备开始执行
filter:0
map:0
filter:1
filter:2
map:2
filter:3
[1, 3]

对比Iterable和Sequence:

Iterable是即时的、Sequence是惰性的:前者会要求尽早的计算结果,因此在多步骤处理链的每一环都会有中间产物也就是新的集合产生;后者会尽可能的延迟计算结果,Sequence处理的中间函数不进行任何计算。相反,他们返回一个新Sequence的,用新的操作装饰前一个,所有的这些计算都只是在类似toList的终端操作期间进行。

img

区分中间操作符和末端操作符的方式也很简单:如果操作符返回的是一个Sequence类型的数据,它就是中间操作符。

在操作的执行方式上也有所不同:Iterable每次都是在整个集合执行完操作后再进行下一步操作——采用第一个操作并将其应用于整个集合,然后移动到下一个操作,官方将其称呼为急切式或者按步骤执行(Eager/step-by-step);而Sequence则是逐个对每个元素执行所有操作。是一种惰性顺序——取第一个元素并应用所有操作,然后取下一个元素,依此类推。官方将其称呼为惰性式或者按元素执行(Lazy/element-by-element)

序列的惰性会带来一下几个优点:

  • 它们的操作按照元素的自然顺序进行;
  • 只做最少的操作;
  • 元素可以是无限多个;
  • 不需要在每一步都创建集合

Sequence可避免生成中间步骤的结果,从而提高了整个集合处理链的性能。但是,惰性性质也会带来一些运行开销。所以在使用时要权衡惰性开销和中间步骤开销,在Sequence和Iterable中选择更加合适的实现方式。

执行的顺序

1
2
3
4
5
6
7
8
9
10
11
sequenceOf(1,2,3)
.filter { print("F$it, "); it % 2 == 1 }
.map { print("M$it, "); it * 2 }
.forEach { print("E$it, ") }
// Prints: F1, M1, E2, F2, F3, M3, E6,

listOf(1,2,3)
.filter { print("F$it, "); it % 2 == 1 }
.map { print("M$it, "); it * 2 }
.forEach { print("E$it, ") }
// Prints: F1, F2, F3, M1, M3, E2, E6,

sequence的执行时按照元素进行的,依次对元素执行所有的操作,对一个元素而言,所有操作时依次全部执行的。而普通集合操作则是以操作步骤进行的,当所有的元素执行完当前操作后才会进入下一个操作。

img

只做最少的操作

试想一下我们有一千万个数字,我们要经过几次变换取出20个,使用下面的代码对比一下序列和不同集合操作的性能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
fun main(){
val fFlow = FFlow()
fFlow.demoList()
fFlow.demoSequence()
}

fun demoSequence() {
val currentTimeMillis = System.currentTimeMillis()
val list =
(0..10000000).asSequence().map { it * 2 }.map { it - 1 }.take(20).toList()
println("demoSequence:${System.currentTimeMillis() - currentTimeMillis}:$list")
}

fun demoList() {
val currentTimeMillis = System.currentTimeMillis()
val list =
(0..10000000).map { it * 2 }.map { it - 1 }.take(20).toList()
println("demoList:${System.currentTimeMillis() - currentTimeMillis}:$list")
}

输出的结果如下:

1
2
demoSequence:20ms:[-1, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37]
demoList:4106ms:[-1, 1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37]

这就是只执行最少操作的意思,序列按照元素顺序执行,当取够29个元素之后便会立即停止计算。而不同的集合则不同,没有中间操作的概念。它的每次操作都会对整个数组中的所有元素执行完才会进行下一个——也就是前两个map都要执行一千万次。

img

序列可以是无限的

看如下代码:

1
2
3
4
5
6
var list = emptyArray<Int>()
var i = 0
while(true){
list[i] = i++
}
list.take(10)

很明显,这段代码是没法正常运行的,因为这里有一个死循环。我们也无法创建一个无限长度的集合。但是:因为序列式按步骤依照需求进行处理的,所哟我们可以创建无限序列:

1
2
3
4
5
6
7
8
9
val noEnd = sequence {
var i = 1
while (true) {
yield(i)
i *= 2
}
}
noEnd.take(4).toList()
//输出:[1, 2, 4, 8]

但是一定要注意,我们虽然可以这么写,但是务必不能真的让while一直循环。我们不能直接使用toList。必须提供一个能结束循环的操作符,也就是不能取出所有元素(无限个)——要么使用类似take的操作来限制它们的数量,要么使用不需要所有元素的终端操作,例如first, find, any, all, indexOf等。

序列不会在每个步骤创建集合

普通的集合会在每次变换之后都会创建新的集合取存储所有变换后的元素。而每次创建集合和填入数据都会带来不小的性能开销。尤其是当我们处理大量或大量的集合时,性能问题会愈发凸显。而序列的按元素操作,则不会有这个问题。除非手动调用了终端操作符,否则不会生成新的集合。

Sequence的基本使用

Sequence序列的使用和普通的Iterable极其相似,实际上其内部也还是借助Iterable实现的。在研究它的内部实现原理之前,想从Sequence的创建和基本的序列操作来演示Sequence的基本用法。

序列的创建

创建Sequence的方式大概可以分为。分别是由元素创建、通过Iterable、借助函数以及由代码块创建。

由元素创建:通过调用顶级函数sequenceOf实现:

1
2
val ints = sequenceOf(1, 2, 3, 4, 5, 6, 7)
val strings = sequenceOf("a","b","c","d","e")

通过Iterable转化:借助Iterable的扩展函数asSequence实现:

1
2
val ints = listOf(1, 2, 3, 4, 5, 6, 7).asSequence()
val strings = listOf("a","b","c","d","e").asSequence()

通过generateSequence实现:该方法有三个:

1
2
3
generateSequence(seedFunction: () -> T?, nextFunction: (T) -> T?): Sequence<T> 
generateSequence(seed: T?, nextFunction: (T) -> T?): Sequence<T>
generateSequence(nextFunction: () -> T?): Sequence<T>

最终都是通过GeneratorSequence实现的,这里先不进行源码分析。只讨论使用方式:

  • 其中三个函数都有的形参nextFunction可以理解为元素生成器,序列里的元素都通过调用该函数生成,当它返回为null是,序列停止生成(所以,nextFunction必须要在某个情况下返回null,否则会因为序列元素是无限多个触发java.lang.OutOfMemoryError: Java heap space异常)。
  • 而另外两个的seedFunction和seed形参都是为了确定数据初始值的。区别在于一个直接指明,一个通过调用函数获取。

分别用这三个函数生成0~100的序列,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
val generateSequenceOne = generateSequence {
if (i < 100) {
i++
} else {
null
}
}
val generateSequenceTwo = generateSequence(0) {
if (it < 100) {
it+1//此处的it是上一个元素
} else {
null
}
}

val generateSequenceThree = generateSequence({ 0 }) {
if (it < 100) {
it+1//此处的it是上一个元素
} else {
null
}
}

由代码块生成:借助sequence(block: suspend SequenceScope.() -> Unit)函数。改函数接受一个挂起函数,该函数会接受一个SequenceScope实例,这个实例无需我们创建(后面源码分析会讲到)。SequenceScope提供了yieldyieldAll方法复杂返回序列元素给调用者,并暂停序列的执行,直到使用者请求下一个元素。

用该函数生成0~100的序列,代码如下:

1
2
3
4
5
val ints = sequence {
repeat(100) {
yield(it)
}
}

序列的操作

对序列的操作可以分为中间操作和末端操作两种。它们只要有一下另种区别:

  • 中间操作返回惰性生成的一个新的序列,而末端序列则为其他普通的数据类型;
  • 中间操作不会立刻执行代码,仅当执行了末端操作序列才会开始执行。

常见的中间操作包括:map、fliter、first、last、take等;它们会序列提供数据变化过滤等增强功能基本上和kotlin提供的集合操作符有着相同的功能。

常见的末端操作有:toList、toMutableList、sum、count等。它们在提供序列操作功能的同时,还会触发序列的运行。

Sequence源码分析

上文对序列做了简单的入门介绍。接下来深入源码去了解一下Sequence的实现方式。

Sequence是什么?

Kotlin对的定义Sequence很简单:

1
2
3
public interface Sequence <out T> {
public operator fun iterator(): Iterator<T>
}

就是一个接口,定义了一个返回Iterator的方法。接口本身只定义了Sequence具有返回一个迭代器的能力。具体的功能实现还是靠它的实现类完成。

可以概括一些:序列就是一个具备提供了迭代器能力的类。

序列的创建方式分析

结合上文中提到的序列的四种创建方式,我们依次分析一下它的创建流程。

我们首先以比较常用的通过Iterable转化获取序列,它需要借助asSequence方法分析一下,使用listOf("a","b","c","d","e").asSequence()生成一个序列。调用链如下:

1
2
3
4
5
6
7
public fun <T> Iterable<T>.asSequence(): Sequence<T> {
return Sequence { this.iterator() }
}

public inline fun <T> Sequence(crossinline iterator: () -> Iterator<T>): Sequence<T> = object : Sequence<T> {
override fun iterator(): Iterator<T> = iterator()
}

流程很简单,一个扩展函数加一个内联函数。最终通过匿名内部类的方式创建一个Sequence并返回。代码很好理解,实际上它的实现逻辑等同于下面的代码:

1
2
3
4
5
6
7
val sequence = MySequence(listOf("a","b","c","d","e").iterator())

class MySequence<T>(private val iterator:Iterator<T>):Sequence<T>{
override fun iterator(): Iterator<T> {
return iterator
}
}

接着看一下通过调用顶级函数sequenceOf实现,以sequenceOf("a","b","c","d","e")为例,它的调用逻辑如下:

1
public fun <T> sequenceOf(vararg elements: T): Sequence<T> = if (elements.isEmpty()) emptySequence() else elements.asSequence()

可以看到依旧是借助asSequence实现的。

接下来看一下代码块和generateSequence的实现方式,这两个方式会比较复杂一点,毕竟前面两个都是借助List进行转换,而List本身就能提供迭代器Iterator。后面两个明显需要提供新的迭代器。 首先看一下代码看的实现方式:

1
2
3
4
5
val ints = sequence {
repeat(100) {
yield(it)
}
}

其中sequence的调用逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
public fun <T> sequence(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Sequence<T> = Sequence { iterator(block) }

public fun <T> iterator(@BuilderInference block: suspend SequenceScope<T>.() -> Unit): Iterator<T> {
//创建迭代器
val iterator = SequenceBuilderIterator<T>()
iterator.nextStep = block.createCoroutineUnintercepted(receiver = iterator, completion = iterator)
return iterator
}

public inline fun <T> Sequence(crossinline iterator: () -> Iterator<T>): Sequence<T> = object : Sequence<T> {
override fun iterator(): Iterator<T> = iterator()
}

可以发现:该方法和asSequence一样最终也是通过匿名内部类的方式创建了一个Sequence。不过区别在于,该方法需要创建一个新的迭代器,也就是SequenceBuilderIterator 。同样以MySequence为例,它的创建流程等同于一下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
fun mian(){
create<Int> { myblock() }
}

suspend fun SequenceScope<Int>.myblock(){
repeat(100) {
yield(it)
}
}

fun <Int> create(block: suspend SequenceScope<Int>.() -> Unit):Sequence<Int>{
val iterator = SequenceBuilderIterator<Int>()
iterator.nextStep = block.createCoroutineUnintercepted(receiver = iterator, completion = iterator)
return MySequence(iterator)
}

当然,这是不可能实现的,因为SequenceBuilderIterator是被private修饰了,我们是无法直接访问的。这里强制写出来演示一下它的流程。

最后看一下通过generateSequence方法创建序列的源码,一共有三个:

1
2
3
4
5
6
7
8
9
10
11
12
public fun <T : Any> generateSequence(seedFunction: () -> T?, nextFunction: (T) -> T?): Sequence<T> =
GeneratorSequence(seedFunction, nextFunction)

public fun <T : Any> generateSequence(seed: T?, nextFunction: (T) -> T?): Sequence<T> =
if (seed == null)
EmptySequence
else
GeneratorSequence({ seed }, nextFunction)

public fun <T : Any> generateSequence(nextFunction: () -> T?): Sequence<T> {
return GeneratorSequence(nextFunction, { nextFunction() }).constrainOnce()
}

最终都是创建了GeneratorSequence的一个实例并返回,而GeneratorSequence实现了Sequence接口并重写了iterator()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private class GeneratorSequence<T : Any>(private val getInitialValue: () -> T?, private val getNextValue: (T) -> T?) : Sequence<T> {
override fun iterator(): Iterator<T> = object : Iterator<T> {
var nextItem: T? = null
var nextState: Int = -2 // -2 for initial unknown, -1 for next unknown, 0 for done, 1 for continue

private fun calcNext() {
nextItem = if (nextState == -2) getInitialValue() else getNextValue(nextItem!!)
nextState = if (nextItem == null) 0 else 1
}

override fun next(): T {
if (nextState < 0)
calcNext()

if (nextState == 0)
throw NoSuchElementException()
val result = nextItem as T
// Do not clean nextItem (to avoid keeping reference on yielded instance) -- need to keep state for getNextValue
nextState = -1
return result
}

override fun hasNext(): Boolean {
if (nextState < 0)
calcNext()
return nextState == 1
}
}
}

总结一下Sequence的创建大致可以分为三类:

  • 使用List自带的迭代器通过匿名的方式创建Sequence实例,sequenceOf("a","b","c","d","e")listOf("a","b","c","d","e").asSequence()就是这种方式;
  • 创建新的SequenceBuilderIterator迭代器,并通过匿名的方式创建Sequence实例。例如使用代码块的创建方式。
  • 创建GeneratorSequence,通过重写iterator()方法,使用匿名的方式创建Iterator。GeneratorSequence方法就是采用的这种方式。

看完创建方式,也没什么奇特的,就是一个提供迭代器的普通类。还是看不出是如何惰性执行操作的。接下来就分析一下惰性操作的原理。

序列的惰性原理

以最常用的map操作符为例:普通的集合操作源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public inline fun <T, R> Iterable<T>.map(transform: (T) -> R): List<R> {
//出啊年一个新的ArrayList,并调用mapTo方法
return mapTo(ArrayList<R>(collectionSizeOrDefault(10)), transform)
}

public inline fun <T, R, C : MutableCollection<in R>> Iterable<T>.mapTo(destination: C, transform: (T) -> R): C {
//遍历原始的集合,进行变换操作,然后将变换后的数据依次加入到新创建的集合
for (item in this)
destination.add(transform(item))
//返回新集合
return destination
}

可以看到:当List.map被调用后,便会立即创建新的集合,然后遍历老数据并进行变换操作。最后返回一个新的数据。这印证了上面提到的普通集合的操作时按照步骤且会立刻执行的理论。

接下来看一下序列的map方法,它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public fun <T, R> Sequence<T>.map(transform: (T) -> R): Sequence<R> {
return TransformingSequence(this, transform)
}

internal class TransformingSequence<T, R>
constructor(private val sequence: Sequence<T>, private val transformer: (T) -> R) : Sequence<R> {
override fun iterator(): Iterator<R> = object : Iterator<R> {
//注释一:TransformingSequence的iterator持有上一个序列的迭代器
val iterator = sequence.iterator()
override fun next(): R {
//注释二:在开始执行迭代时,向上调用前一个序列的迭代器。
return transformer(iterator.next())
}

override fun hasNext(): Boolean {
return iterator.hasNext()
}
}

internal fun <E> flatten(iterator: (R) -> Iterator<E>): Sequence<E> {
return FlatteningSequence<T, R, E>(sequence, transformer, iterator)
}
}

代码并不复杂,它接收用户提供的变换函数和序列,然后创建了一个TransformingSequence并返回。TransformingSequence本身和上文中提到的序列没什么区别,唯一的区别在于它的迭代器:在通过next依次取数据的时候,并不是直接返回元素。而是先调用调用者提供的函数进行变换。返回变换后的数据——这也没什么新鲜的,和普通集合的map操作符和RxJava的Map都是同样的原理。

但是,这里却又有点不一样。操作符里没有任何开启迭代的代码,它只是对序列以及迭代进行了嵌套处理,并不会开启迭代.如果用户不手动调用(后者间接调用)迭代器的next函数,序列就不会被执行——这就是惰性执行的机制的原理所在。

而且,由于操作符返回的是一个Sequence类型的值,当你重复不断调用map时,例如下面的代码:

1
2
3
4
5
6
7
8
(0..10).asSequence().map{add(it)}.map{add(it)}.map{add(it)}.toList()

//等同于

val sequence1 = (0..10).asSequence()
val sequence2 = sequence1.map { it+1 }
val sequence3 = sequence2.map { it+1 }
sequence3.toList()

最终,序列sequence3的结构持有如下:sequence3-> sequence2 -> sequence1。而它们都有各自的迭代器。迭代器里都重写了各自的变换逻辑:

1
2
3
4
5
6
7
8
9
override fun next(): R {
return transformer(iterator.next())
}

//由于这里都是执行的+1操作,所以变换逻辑transformer可以认为等同于如下操作:

override fun next(): R {
return iterator.next()+1
}

而当我们通过sequence3.toList执行代码时,它的流程如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public fun <T> Sequence<T>.toList(): List<T> {
return this.toMutableList().optimizeReadOnlyList()
}

public fun <T> Sequence<T>.toMutableList(): MutableList<T> {
//末端操作符,此处才会开始创建新的集合
return toCollection(ArrayList<T>())
}

public fun <T, C : MutableCollection<in T>> Sequence<T>.toCollection(destination: C): C {
//执行迭代器next操作
//当调用(末端操作符)走到这里时,便会和普通结合的操作符一样
//此时为新创建的集合赋值
for (item in this) {
destination.add(item)
}
return destination
}

经过几次扩展函数调用,最终在toCollection里开始执行迭代(Iterator的典型的操作),也就是获取了sequence3的iterator实例,并不断通过next取出数据。而在上文中的TransformingSequence源码里可以看到,TransformingSequence会持有上一个迭代器的实例(代码注释一)。

并且在迭代开始后,在进行transformer操作(也就是执行+1操作)前,会调用上一个迭代器的next方法进行迭代(代码注释二)。这样不断的迭代,最终,最终会调用到sequence1的next方法。再结合上文中的序列创建里的分析——sequence1里所持有的迭代器就是就是原始数据里的迭代器。

那么当最终执行toList方法时,它会循环sequence3.iterator方法。而在每次循环内,都会首先执行sequence3所持有的sequence2.iterator的next方法。sequence2依次类推执行到sequence1的sequence1.iterator`方法,最终执行到我们原始数组的迭代器next方法:

整个流程如下:

img

原理就是这么简单:中间操作符通过序列嵌套,实现对迭代器iterator的嵌套。这样在进行迭代的时候,会依次调用各个iterator迭代器直到调用到原始集合数据里的迭代器开始并返回元素。而当元素返回时,会依次执行各个迭代器持有变换操作方法实现对数据的变换。

img

而其他操作符,也是遵循这个基本的规则。无非就是增加一些其他的操作。

总结

  • 序列通过中间操作符对迭代器进行嵌套和复写,以此实现按元素操作执行变换;
  • 中间操作符只负责根据需求创建并嵌套迭代器,并不负责开启迭代器。以此实现惰性操作且不产生临时集合;
  • 末端操作符负责开启迭代,按照嵌套顺序执行迭代操作。依次获取操作后的数据,并且会创建新的集合用来存储最终数据;
  • 序列不是万能的,因为要引入新的对象。在带来惰性和顺序执行的优势时,这些对象必然会带来性能开销。所以要依需求在集合和序列之间进行选择,使用合适的方式进行迭代。