Starting with Sequence

Sequence is a Lazy List implementation provided by Kotlin. For example, the following Fibonacci list is implemented using Sequence.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
val fibonacci = sequence {
  var terms = Pair(0L, 1L)
  while (true) {
    🏹 yield(terms.first)
    terms = Pair(
      terms.second,
      terms.first + terms.second
    )
  }
}

The terminal operator

We use an infinite loop in the Sequence builder block to continuously compute the elements of the sequence and send them to the consumer via yield. The consumers of Sequence are the so-called terminal operators, such as forEach, sum, etc.

1
2
fibonacci.take(10).forEach { println(it) }
fibonacci.take(10).sum()

In the case of forEach, whenever we yield a new element in the sequence block, we send that element to the forEach block for processing, and then go back to the sequence block to execute the subsequent logic and compute the next value. A normal list would first compute all the elements in memory. Because of this, we can use Sequence to represent an infinite sequence of elements. However, when consuming data, you must use the take intervening operator to limit the number of elements to be consumed.

The intermediate operator

Sequence has map and filter intermediate operators, just like List. Each call to an intermediate operator on a List returns a new List; a call to an operator on a Sequence is equivalent to building a pipeline of data. After an element is yielded, it travels through these defined data pipelines to the end consumer.

Assuming there is no end-consumer, then we have built the data pipeline, but the code in the Sequence builder does not run at runtime.

The following small example will output the string "AaBbCc"

1
2
3
4
5
6
7
sequence {
  yield("A".also { print(it) })
  yield("B".also { print(it) })
  yield("C".also { print(it) })
}.forEach {
  print(it.toLowerCase())
}

### : find the first non-null property

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class Feedback(
  val rate: Int,
  val message: String?
)

val feedbacks = listOf(
  Feedback(4, null),
  Feedback(4, "Cool and clear function"),
  Feedback(3, null),
  Feedback(1, "Hey, it's too much"),
)

Suppose we want to find the first Feedback.message in the list of feedbacks that is not null, i.e. "Cool and clear function". There are several ways to do this.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 方法1: 生成一个中间 Collection
feedbacks
  .mapNotNull { it.name }
  .firstOrNull()

// 方法2: 转换成 Sequence
feedbacks
  .asSequence()
  .mapNotNull { it.name }
  .firstOrNull()

// 方法3: Kotlin 1.5 标准库方法
feedbacks.firstNotNullOfOrNull { it.name }

If the set has a lot of elements or needs to be transformed by many operators, Method 1 creates a new set for each transformation and stores the intermediate results, which is a bit wasteful. It would be better to use Sequence for such scenarios.

Use Case: Flat Nested List Iterator

Suppose there is a nested list of integers. Design an iterator that iterates over all the integers in this integer list. The integer list is represented by this interface.

1
2
3
4
5
interface NestedInteger {
  fun isInteger(): Boolean
  fun getInteger(): Int?
  fun getList(): List<NestedInteer>?
}

Each item in a list is either an integer or another list. The elements of the list may also be integers or other lists. Example.

1
2
输入: [[1,1],2,[1,1]]
输出: [1,1,2,1,1]

If all the elements are stored in a list, the problem can be easily solved using recursion.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
fun flatten(nestedList: List<NestedInteger>): List<Int> {
  val ans = mutableListOf()

  fun walk(list: List<NestedInteger>) {
    for (item in list) {
      if (item.isInteger()) ans += item
      else walk(item.getList())
    }
  }

  walk(nestedList)

  return ans
}

After getting the list, it can be turned into an iterator. But the iterator should be “lazy”, meaning that the consumer consumes the data while the iterator traverses the data source. The advantages of this include.

  • If the consumer only needs the first few pieces of data, and the data source is large, then pre-computing the entire List does a lot of useless work.
  • Consumers do not have to wait for the entire List to be computed before proceeding to the next step. The whole data processing process can be sped up by concurrency.

However, this makes it inconvenient for us to recurse directly, and we need to maintain a stack manually by ourselves, which makes the original simple code much more complicated (you can see the full code at here) .

With Sequence, we can implement a lazy iterator using the recursive algorithm.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// 为了进行递归需要封装出一个函数
suspend tailrec fun SequenceScope<Int>.walk(
  list: List<NestedInteger>
) {
  for (item in list) {
    if (item.isInteger()) yield(item.getInteger())
    else walk(item.getList())
  }
}

sequence { walk(nestedList) }.iterator() // 题目所求

Flow: Suspendable Sequence

The above example has only pure computation in the data flow. In real scenarios we may need to do some time-consuming operations in the middle or terminal operators (assuming they are encapsulated in suspend functions), such as interfacing, reading and writing data from the database.

1
2
3
4
5
6
7
8
suspend fun search(string: String) {/**/}
suspend fun saveToDB(string: String) {/**/}

// ❌ 无法编译
// 不能在 Sequence 的操作符中调用 suspend 函数
sequenceOf("foo", "bar")
  .map { search(it) } // 调用远程 api
  .forEach { saveToDB(it) } // 存至数据库

In the above example, we want to call the interface to search for "foo" and "bar" in order, and then store the search results in the database, i.e. in this order.

  • yield "foo"
  • search "foo"
  • saveToDB "foo"
  • yield "bar"
  • search "bar"
  • saveToDB "bar"

However, Sequence’s operators can only pass in regular (non-suspend) blocks, and cannot call suspend functions within them.

So we have Flow.

1
2
3
4
5
6
7
8
suspend fun search(string: String) {/**/}
suspend fun saveToDB(string: String) {/**/}

scope.launch {
	flowOf("foo", "bar")
    .map { 🏹 search(it) } // 调用远程 api
 🏹 .collect { 🏹 saveToDB(it )} // 存至数据库
}

Flow is designed to execute sequentially by default, as are Kotlin Coroutine, with no concurrency. The above example is equivalent to the following loop in terms of execution order.

1
2
3
4
for (item in listOf("foo", "bar")) {
  val result = 🏹 search(item)
  🏹 saveToDB(result)
}

The terminal operator represented by collect is a suspend function. Therefore, a CoroutineScope is required to consume Flow, while the terminal operators of Sequence are just normal functions.

In fact, like Sequence, the end operators of Flow are the “driving force” of the entire data flow. If there is no terminal operator, but only a number of map and filter intermediate operators are referenced, it is equivalent to building a data pipeline, the code in the Flow builder will not run and the data will not flow. This kind of Flow is called “cold flow”. A useful way to understand this Flow is to analogize its definition to a function definition, and the terminal operator to a (suspend) function call

1
2
val myFlow: Flow<Int> = flow {/**/}
  .map {/**/}
  • The code in the flow {} block and map {} will only run and the data will flow if the terminal operator is called on myFlow.
  • If collect is called twice on myFlow, it is like calling the same function twice.
  • myFlow contains a suspend block inside, and should be treated as a suspend function itself, and must provide a CoroutineScope when called.

Flow implements asynchrony with the suspend infrastructure provided by the Kotlin language, while RxJava needs to reflect asynchrony in the data types in the flow. For example, using RxJava for network requests

1
2
3
4
5
6
fun callSearchApi(string: String)
  : Observable<SeachResult>

Observable.just("foo", "bar")
  .flatMap { callSearchApi(it) }
  .subscribe { println(it) }

Notice that the return value of callSearchApi needs to be nested in an Observable, and the transformations in the data stream need to use flatMap. With Coroutine and Flow, asynchronous functions do not need to wear any nesting, and transforms in the data stream can be done directly using map, which is more natural.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
suspend fun callSearchApi(string: String)
  : SeachResult

scope.launch {
  flowOf("foo", "bar")
    .map { callSearchApi(it) }
    .collect { println(it) }
}

// 或者使用 `launchIn`,减少嵌套
flowOf("foo", "bar")
  .map { callSearchApi(it) }
  .onEach { println(it) }
  .launchIn(scope)

A Toy Flow

The design and implementation of Kotlin Flow is very simple and elegant, so let’s try to implement a minimalist toy version.

The Flow interface has only a single collect method that takes a FlowCollector argument. The FlowCollector is a typical interface representing a consumer (e.g. Comparator, also using <in T>).

1
2
3
4
5
6
7
interface Flow<out T> {
  suspend fun collect(collector: FlowCollector<T>)
}

interface FlowCollector<in T> {
  suspend fun emit(value: T)
}

This collect method can be seen as the link between the upstream and downstream of the reactive data flow.

  • Upstream data sources, accessed via this.
  • Downstream consumers, accessed via collector.

Implementing collect

1
2
3
4
5
class MyFlow: Flow<T> {
  override suspend fun collect(collector: FlowCollector<T>) {
    TODO()
  }
}

When we collect we need to send the data to the FlowCollector for consumption, so where does this data come from? We can use the Flow builder to create the flow

1
2
3
4
5
6
7
flow {
  🏹 emit(1)
  🏹 emit(2)
  🏹 emit(3)
} 🏹.collect {
  println(it)
}

Hey, did you notice emit? It’s none other than the FlowCollector.exit method. So this Flow builder function takes a block with FlowCollector as the receiver. Where does this FlowCollector come from? It’s passed in during Flow.collect.

This creates a closed loop: the emit we call in the suspend block of the flow builder function is called on the downstream consumer that will appear when we collect Flow in the future. This reflects the “lazy” nature of Flow: we don’t compute the data immediately when we create a Flow, but pass in a suspend function block, and when the Flow collects, we get the consumer FlowCollector and use it as a receiver to call the pre-saved suspend block. This design makes particularly clever use of Kotlin’s receiver lambda feature.

Putting the above analysis into code, we get our toy implementation.

1
2
3
4
5
6
class MyFlow(
  private val builder: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
  override suspend fun collect(collector: FlowCollector<T>)
    = collector.builder()
}

To make it easier for the caller to be able to consume data using collect {...} to consume the data, you can define an extension function

1
2
3
4
5
6
// 原封不动的协程库实现
public suspend inline fun <T> Flow<T>.collect(
  crossinline action: suspend (value: T) -> Unit
): Unit = collect(object : FlowCollector<T> {
  override suspend fun emit(value: T) = action(value)
})

Intermediate operators

The intermediate operator collects the upstream flow, transforms the upstream data, and sends the transformed data to a new flow. for example, we can implement map like this

1
2
3
4
fun <T, R> Flow<T>.map(block: suspend (value: T) -> R) =
  MyFlow<R> {
    collect { emit(block(it)) }
  }

Like Sequence and Iterable, Flow’s intermediate operators are all extension functions. This allows Flow’s interface to have only one method, keeping it lean and meanwhile making it easy for users to customize operators. There is no difference between calling custom operators and the standard library’s own operators, unlike RxJava, which requires additional APIs like compose or lift (see Implementing Custom Operators in RxJava).

The pattern “transform the upstream data and return a new Flow” is so common that the Kotlin library provides a transform method to implement map, filter, and many other operators. It is also recommended to use it when defining your own operators.

1
2
3
4
5
6
// Kotlin 协程库内 Flow.map 的实现
public inline fun <T, R> Flow<T>.map(
  crossinline transform: suspend (value: T) -> R
): Flow<R> = transform { value ->
   return@transform emit(transform(value))
}

Full code

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
class MyFlow<T>(
  private val builder: suspend FlowCollector<T>.() -> Unit
) : Flow<T> {
  override suspend fun collect(collector: FlowCollector<T>) = collector.builder()
}

fun <T, R> Flow<T>.map(block: suspend (value: T) -> R) = MyFlow<R> {
  collect { emit(block(it)) }
}

fun <T> Flow<T>.filter(block: suspend (value: T) -> Boolean) = MyFlow<T> {
  collect { if (block(it)) emit(it) }
}

suspend fun main() = MyFlow<Int> { emit(1); emit(2) }
  .map { it * it }
  .filter {
    🏹 delay(1000)
    it % 2 == 0
  }
  🏹 .collect { println("collected $it") }

As you can see, with Kotlin’s existing suspend infrastructure, implementing a reactive dataflow with asynchronous support is just two or three lines. In fact, the Kotlin Coroutine Library Flow is stripped down to a core code that is not too different from our toy implementation, but provides two additional guarantees: Context preservation and Exception transparency.

Additional guarantees for Flow

Context preservation

RxJava can switch threads with the observeOn and subscribeOn operators. But after we get an Observable, we can’t determine which thread the consumer will be executed in just by looking at the function signature, so we usually call observeOn(mainThread) once manually. Sometimes the project will add the switch to the main thread in a global location (e.g. Retrofit’s call adapter), but when it comes to the actual call, you may be unsure, or you may habitually say observeOn(mainThread) and the thread is cut again and again.

Flow provides a guarantee of ``context preservation’’: Flow ensures at runtime that the upstream cannot change the downstream context. In other words, the threads consuming Flow depend on the CoroutineContext of the collect call. What you see is what you get, and where you collect is what you execute. Suppose we take a Flow from some API.

1
2
3
4
5
6
7
8
fun magicFlow(): Flow<String> = {/**/}

lifecycleScope.launch {
  magicFlow().collect {
    // 一定在主线程执行
    uiBinding.label.text = it
  }
}

This magicFlow may be partially switched to some background thread, but these are internal implementation details of magicFlow for the caller, so don’t care. The caller wants to consume the data in the main thread because he wants to update the UI. androidx provides a LifecycleScope that specifies the main thread as a Coroutine scheduler. We call the suspend collect method inside the LifecycleScope-opened Coroutine to make sure that the collect block will be executed in the main thread.

This design is in line with Kotlin’s Coroutine. suspend function execution depends on the CoroutineScope of the suspend function and is entirely within the control of the caller. suspend function may internally switch to other threads (e.g. in IO scenarios where it is necessary to switch threads to avoid blocking the main thread), but the caller does not need to care. The details of the thread switch are also almost transparent to the caller of Flow. In Android client calls that expose Flow’s API to update the UI, you can just collect Flow in the main thread, without having to manually switch threads.

Looking back at our toy implementation does not provide such a guarantee

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
fun magicFlow() = MyFlow<Int> {
  withContext(Dispatchers.IO) {
    emit(1)
  }
}

suspend fun main() {
  magicFlow().collect {
    // 会调度到 Dispatchers.IO 运行
    println(coroutineContext)
  }
}

In the above example, we are emitting data in a block that has switched the Coroutine scheduler. Recall that emit is actually a call to a lambda in collect, so that the internal implementation of the upstream flow “secretly” switches the downstream caller’s CoroutineContext, making it difficult for the caller to know at a glance where the code of the consuming flow will be executed in the CoroutineContext.

So, the Kotlin library’s Flow implementation checks that emit and collect are executed in the same Coroutine, otherwise it will simply throw an exception.

1
2
Exception in thread "main" java.lang.IllegalStateException:
Flow invariant is violated

Note that Flow prohibits emitting data in different Coroutine, not that you can’t switch Contexts in a Flow block, as in the following example.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
suspend fun main() {
  val f = flow {
    emit(1)
    val value = withContext(Dispatchers.IO) { 2 }
    emit(value)
  }

  f.collect {
    println(it)
  }
}

By the design of the Kotlin Coroutine, this is necessarily the way to write it. We can imagine abstracting the expression withContext block into a suspend function that is called from within the Flow builder. suspend is internally transparent to the external caller for switching Context.

Exception transparency (Exception transparency)

Another guarantee of Flow is exception transparency. However, in my current opinion, it is not recommended to throw exceptions when using Kotlin Coroutine. You can check the official documentation if you are interested in this part.

Asynchronous Flow - Exception Transparency

Flow in Android client application

To summarize, the advantages of Flow over RxJava include.

  • Relying on Kotlin’s suspend infrastructure, the design and implementation are simpler and more elegant, and the operators are more combinatorial.
  • Extends function-defined operators, and custom operators are called in the same way as the Coroutine library’s own Flow operators.
  • Additional guarantees such as context preservation are provided, continuing the Coroutine design idea of making the details of thread switching almost completely transparent.

However, on the Android client side, most asynchronous scenarios using Kotlin Coroutine (the suspend function) is sufficient. RxJava is very popular in the Android community, mainly to solve the problem of troublesome thread switching, a scenario that Kotlin Coroutine has been able to solve very elegantly.

Currently, more and more APIs in Android Jetpack use Flow, such as DataStore, Room, Paging 3 and so on. Establishing a proper understanding of Flow will enable better use of these libraries.

In addition, RxAndroid encapsulates common Android components as data sources, which is convenient for us to do function-responsive programming, and it works better in some simple scenarios, such as debounce the user’s input and then call the asynchronous interface. flow can use ReactiveCircus/FlowBinding.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
lifecycleScope.launch {
  binding.editText.textChanges()
    .debounce(300)
    // 根据 LifecycleOwner 生命周期自动取消
    .flowWithLifecycle(this, Lifecycle.State.STARTED)
    .map {
      🏹 callSearchApi(it)
    }
    .collectLatest {
      updateUi(it)
    }
}

Corresponding RxJava versions.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
RxTextView.textChanges(binding.editText)
  .debounce(300, TimeUnit.MILLISECONDS)
  .switchMap {
    callApi()
  }
  .subscribeOn(io())
  .observeOn(mainThread())
  .subscribe {
    updateUi(it)
  }

Notice that Flow’s terminal operator uses collectLatest. Since Flow’s producer and consumer can both suspend, when the consumer suspend is processing an element suspend, if the producer emits new data, the *Latest series of operators will cancel the Coroutine block that processed the previous element. This behavior is similar to the logic of RxJava using switchMap. In client-side scenarios, it seems that collectLatest should be used in most cases.

But doesn’t Flow execute sequentially (Sequential), where the producer waits for the consumer? This is because the *Latest family of operators has an additional channel inside that listens and reacts upstream while suspend is going on downstream. Such a Flow always has an “active” part inside, which is different from the “cold flow” described in this article, and is a topic for another article.