此文是对Kotlin官方关于协程的一个教程翻译,原文:https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/01_Introduction
在这个动手教程中,我们将熟悉协程的概念。协程为我们提供了异步和非阻塞行为的所有好处,但又不缺乏可读性。我们将看到如何使用协程执行网络请求而不阻塞底层线程和不使用回调。
我们将学习:
我们还将讨论协程与其他现有解决方案的不同之处。
对于网络请求,我们将使用Retrofit库,该库已经支持协程,但是本教程中显示的方法是通用的,并且对于不同的库的工作方式相似。
本教程基于Roman Elizarov在KotlinConf的研讨会的“使用 Kotlin 进行异步编程”。
在IntelliJ的开始屏幕中通过Get from VCS克隆存储库:http://github.com/kotlin-hands-on/intro-coroutines
该项目包括solutions分支,因此可以使用该Compare with branch...操作将您的解决方案与建议的解决方案进行比较。
生成 GitHub 开发者令牌
我们将使用 GitHub API。为此,您需要指定您的 Github 帐户名称以及密码或令牌。如果你在 GitHub 上启用了双因素身份验证,那么就只能使用令牌的方式了。
您可以在此处从您的帐户生成新的GitHub令牌以使用GitHub API:https://github.com/settings/tokens/new. 指定令牌的名称,例如coroutines-hands-on:
无需选择任何范围,直接点击屏幕底部的“Generate token”即可生成令牌,如下:
这里敢把token截图出来,是因为token默认有效期是30天,而且我们没有勾选任何范围,即该token的权限很小,不怕!把token复制出来保存好,后面会用到它。
运行代码
打开src/contributors/main.kt文件并运行main函数,如下:
在这里输入我们的Github账号和Token,然后点击Load contributors,如下:
在Variant处,我们选的是BLOCKING的方式,当点击“Load contributors”按钮时,它会以阻塞的方式进行加载数据,加载的是给定组织下所有存储库的贡献者,默认情况下,组织是“kotlin”。因为是以阻塞的方式进行加载的,所以加载的时候UI会被卡住一段时间,然后显示贡献者列表。
我们将比较实现此逻辑的不同方式,并了解协程如何改变事物。在开始使用协程之前,我们先来看看还有哪些其他解决方案。首先,我们将看看如何以阻塞的方式实现逻辑:易于阅读和推理,但由于它冻结了 UI,因此是错误的。然后我们将使用回调来修复它,并将这些解决方案与使用协程的解决方案进行比较。最后,我们将看到如何使用 Channels 在不同的协程之间共享信息。
让我们开始!
我们将使用开发人员GitHub API,它会在您的帐户下执行请求,并使用您提供的账号和身份验证令牌。
我们将使用Retrofit向GitHub执行HTTP请求。它允许我们请求给定组织下的存储库列表,以及每个存储库的贡献者列表:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
fun getOrgReposCall(
@Path("org") org: String
): Call<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
fun getRepoContributorsCall(
@Path("owner") owner: String,
@Path("repo") repo: String
): Call<List<User>>
}
使用src/tasks/Request1Blocking.kt中定义的loadContributorsBlocking函数来获取给定组织的贡献者列表。让我们看看它是如何实现的:
fun loadContributorsBlocking(service: GitHubService, req: RequestData) : List<User> {
val repos = service
.getOrgReposCall(req.org) // #1
.execute() // #2
.also { logRepos(req, it) } // #3
.body() ?: listOf() // #4
return repos.flatMap { repo ->
service
.getRepoContributorsCall(req.org, repo.name) // #1
.execute() // #2
.also { logUsers(repo, it) } // #3
.bodyList() // #4
}.aggregate()
}
首先,我们获取给定组织下的存储库列表并将其存储在repos列表中。然后对于每个存储库,我们请求贡献者列表并将所有这些列表合并为一个最终的贡献者列表。
getOrgReposCall和getRepoContributorsCall各自返回一个Call类的实例给我们(#1),此时,没有发送请求,当调用Call.execute的时候才开始发送请求(#2)。execute是阻塞底层线程的同步调用。
当我们得到response时,我们通过调用特定的logRepos()和logUsers()函数(#3)记录结果。如果HTTP response包含错误,则会在此处记录此错误。
最后,我们需要获取response的body,其中包含所需的数据。在本教程中为简单起见,在出现错误的时候,我们将使用一个空列表作为结果,并记录相应的错误 (#4)。为了避免.body() ?: listOf()一遍又一遍地重复,我们声明了一个扩展函数bodyList:
fun <T> Response<List<T>>.bodyList(): List<T> {
return body() ?: listOf()
}
logRepos和logUsers立即记录收到的信息。运行代码时,查看系统输出,它应该显示如下内容:
2504 [AWT-EventQueue-0] INFO Contributors - kotlin: loaded 76 repos
2851 [AWT-EventQueue-0] INFO Contributors - kotlin-examples: loaded 29 contributors
3208 [AWT-EventQueue-0] INFO Contributors - ts2kt: loaded 11 contributors
3559 [AWT-EventQueue-0] INFO Contributors - kotlin-koans: loaded 43 contributors
...
每行日志的第一项是自程序启动以来经过的毫秒数,然后是方括号中的线程名称,我们可以看到加载请求是从哪个线程调用的。每行的最后一项是实际消息:加载了多少存储库或贡献者。
此日志表明所有结果都是从主线程记录的。当我们在BLOCKING的方式下运行代码时,我们会发现窗口会冻结,直到加载完成才会对输入做出反应。所有请求都从我们调用loadContributorsBlocking的同一个线程执行,这是主 UI 线程(在 Swing 中,它是一个 AWT 事件调度线程)。这个主线程被阻塞了,这就解释了为什么 UI 被冻结了:
加载贡献者列表后将更新结果。如果我们看一下loadContributorsBlocking是如何调用的,我们会发现updateResults在loadContributorsBlocking之后被调用:
val users = loadContributorsBlocking(service, req)
updateResults(users, startTime)
可以在src/contributors/Contributors.kt中找到此代码,loadContributors函数负责选择加载贡献者的方式。updateResults是一个更新UI的函数,因此,必须始终从 UI 线程调用它。
要熟悉任务域(稍后我们将需要它),请完成以下简单任务。目前,如果我们运行代码,我们可以看到每个贡献者的名字都重复了好几次,他们参与的每个项目都重复一次。这个任务是实现aggregate函数,用于完成用户组合的功能,使每个贡献者只添加一次。该属性应包含给定用户对所有User.contributions项目的贡献总数。结果列表应根据贡献的数量按降序排序。
Task(任务)
打开src/tasks/Aggregation.kt,然后实现List<User>.aggregate()函数。相应的测试文件test/tasks/AggregationKtTest.kt显示了预期结果的示例。您可以在源代码和测试类之间自动跳转使用 IntelliJ IDEA 快捷方式。打开打开src/tasks/Aggregation.kt,代码如下:
fun List<User>.aggregate(): List<User> = this
User类源代码如下:
@Serializable
data class User(
val login: String,
val contributions: Int
)
login属性用于保存用户的github账号,contributions属性用于保存该账号在某个组织下的贡献数量。所以我们要实现的就是把一个用户在所有组织的贡献数量合起来保存在一个User对象中即可。实现如下:
fun List<User>.aggregate(): List<User> =
this.groupBy { user -> user.login } // 按登录账号进行分组
.map { entry -> // 把每一个分组映射成一个User对象
User(entry.key, entry.value.sumOf { user -> user.contributions })
}
.sortedByDescending { user -> user.contributions } // 按贡献数量降序排序
执行此任务后,kotlin组织的结果列表应类似于以下内容:
我们也可以运行test/tasks/AggregationKtTest.kt的testAggregation()测试函数,看看我们实现的aggregate()函数是否能通过测试,testAggregation()测试函数如下:
class AggregationKtTest {
@Test
fun testAggregation() {
val actual = listOf(
User("Alice", 1), User("Bob", 3),
User("Alice", 2), User("Bob", 7),
User("Charlie", 3), User("Alice", 5)
).aggregate()
val expected = listOf(
User("Bob", 10),
User("Alice", 8),
User("Charlie", 3)
)
Assert.assertEquals("Wrong result for 'aggregation'", expected, actual)
}
}
运行该测试,将会显示测试通过,如下:
在后台线程调用loadContributors:
thread {
loadContributorsBlocking(service, req)
}
现在所有的加载都转移到了一个单独的线程,主线程是空闲的,可以被不同的任务占用:
loadContributors函数的签名发生了变化,它需要一个updateResults回调作为最后一个参数,以便在所有加载完成后调用它:
fun loadContributorsBackground(service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit)
现在,当loadContributorsBackground调用时,updateResults在回调中调用,而不是像以前那样立即进行:
loadContributorsBackground(req) { users ->
SwingUtilities.invokeLater {
updateResults(users, startTime)
}
}
通过调用SwingUtilities.invokeLater来确保updateResults更新结果的调用发生在主UI线程(AWT事件调度线程)上。
但是,如果我们尝试通过BACKGROUND选项加载贡献者,我们可以看到列表已更新,但没有任何变化。
任务
修复src/tasks/Request2Background.kt的loadContributorsBackground()函数,以便结果列表显示在 UI 中,如下:
fun loadContributorsBackground(service: GitHubService, req: RequestData, updateResults: (List<User>) -> Unit) {
thread {
val userList = loadContributorsBlocking(service, req)
updateResults(userList)
}
}
使用Retrofit回调API
我们已将整个加载逻辑移至后台线程,但这仍然不是资源的最佳使用。所有的加载请求一个接一个地依次进行,在等待加载结果的过程中,线程被阻塞,但它可能被其他一些任务占用。具体来说,它可以开始另一个加载,以便更早地收到整个结果!
处理每个存储库的数据应分为两部分:首先是加载,然后是处理结果响应。第二个“处理”部分应该被提取到回调中。然后可以在收到前一个存储库的结果之前启动每个存储库的加载(并调用相应的回调):
Retrofit回调API可以帮助实现这一点。我们将使用Call.enqueue函数来启动HTTP请求并将一个回调作为参数。在这个回调中,我们需要指定每次请求后需要做什么。
src/tasks/Request3Callbacks.kt里面的loadContributorsCallbacks()使用这个API。为方便起见,我们把onResponse扩展函数声明在相同的文件中。它将 lambda 作为参数而不是对象表达式。
fun loadContributorsCallbacks(service: GitHubService, req: RequestData,
updateResults: (List<User>) -> Unit) {
service.getOrgReposCall(req.org).onResponse { responseRepos -> // #1
logRepos(req, responseRepos)
val repos = responseRepos.bodyList()
val allUsers = mutableListOf<User>()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers -> // #2
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
}
}
// TODO: Why this code doesn't work? How to fix that?
updateResults(allUsers.aggregate())
}
}
我们将处理响应的逻辑提取到回调中:相应的 lambda 在#1和#2。
但是,提供的解决方案不起作用。如果我们运行程序并选择CALLBACKS选项来加载贡献者,我们可以看到没有显示任何内容。但是,立即返回结果的测试通过了。为什么?
花一些时间思考为什么给定的代码不能按预期工作,然后继续阅读。
任务(可选)
重写代码,以便显示加载的贡献者列表。
解决方案(第一次尝试)
我们同时启动了许多请求,这让我们减少了总加载时间。但是,我们没有等待加载的结果。我们的updateResults在启动所有加载请求后立即调用回调,此时allUsers列表尚未填充数据。
我们可以尝试通过对代码进行以下更改来解决此问题:
val allUsers = mutableListOf<User>()
for ((index, repo) in repos.withIndex()) { // #1
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (index == repos.lastIndex) { // #2
updateResults(allUsers.aggregate())
}
}
}
在行#1中,我们使用索引遍历repos列表。然后从每个回调中,我们检查我们是否在最后一次迭代(#2)。如果是这种情况,我们会更新结果。
但是,此代码也是不正确的。为什么?问题的根源是什么?花一些时间尝试找到这个问题的答案,然后继续阅读。
解决方案(第二次尝试)
由于加载请求是同时启动的,因此没有人保证最后一个的结果是最后一个。结果可以按任何顺序出现。因此,如果我们使用当前索引与lastIndex作为完成条件的比较,我们可能会丢失某些回调的结果。如果处理最后一个repo的请求返回的速度比一些先前的请求更快(这很可能发生),则需要更多时间处理的请求的所有结果都将丢失。
解决此问题的方法之一是引入索引并检查我们是否已经处理了所有存储库:
val allUsers = Collections.synchronizedList(mutableListOf<User>())
val numberOfProcessed = AtomicInteger()
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
logUsers(repo, responseUsers)
val users = responseUsers.bodyList()
allUsers += users
if (numberOfProcessed.incrementAndGet() == repos.size) {
updateResults(allUsers.aggregate())
}
}
}
请注意,我们使用的是同步版本的list和AtomicInteger,因为通常不能保证getRepoContributors总是从同一个线程调用不同的回调处理请求。
解决方案(第三次尝试)
更好的解决方案是使用CountDownLatch类。CountDownLatch存储一个计数器,我们用存储库的数量初始化它。我们在处理完每个存储库后递减这个计数器,然后等到latch倒计时到零,然后再更新结果:
val countDownLatch = CountDownLatch(repos.size)
for (repo in repos) {
service.getRepoContributorsCall(req.org, repo.name).onResponse { responseUsers ->
// processing repository
countDownLatch.countDown()
}
}
countDownLatch.await() // 阻塞,直到倒计时为0。
updateResults(allUsers.aggregate())
您可以看到,使用回调编写正确的代码可能并非易事且容易出错,尤其是当有多个底层线程并且发生同步时。接下来,我们将讨论如何使用suspend函数来实现相同的逻辑。
关于使用 RxJava 的注意事项
由于我们不希望本教程的所有读者都熟悉RxJava库,因此我们不会详细描述使用RxJava的版本。但是,如果您有兴趣,可以将其用作附加练习,并使用反应式方法实现相同的逻辑。使用RxJava所需的所有依赖项和解决方案都可以在项目中单独的“rx”分支中找到。也可以阅读本教程直到最后,然后实施或检查建议的Rx版本以进行适当的比较。
Retrofit 为协程提供了原生支持,我们将使用它。替换返回类型Call<List<Repo>>为List<Repo>,并把函数声明为suspend:
interface GitHubService {
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): List<Repo>
}
它背后的主要思想是,当我们使用suspend函数执行请求时,底层线程不会被阻塞。稍后我们将讨论它是如何工作的。
请注意,现在getOrgRepos直接返回结果而不是返回Call。如果结果不成功,则抛出异常。
或者,Retrofit也允许返回包装在Response的结果. 在这种情况下,会提供body result,并且可以手动检查错误。在这个教程中我们将使用Response的版本。
请将以下声明添加到GitHubService:
interface GitHubService {
// getOrgReposCall & getRepoContributorsCall declarations
@GET("orgs/{org}/repos?per_page=100")
suspend fun getOrgRepos(
@Path("org") org: String
): Response<List<Repo>>
@GET("repos/{owner}/{repo}/contributors?per_page=100")
suspend fun getRepoContributors(
@Path("owner") owner: String,
@Path("repo") repo: String
): Response<List<User>>
}
如果从loadContributorsBlocking调用suspend函数,将会报错:“suspend函数’getOrgRepos’ 只能被一个协程或另一个挂suspend数调用”。因此,我们需要将新版本的loadContributors函数标记为suspend以便使用这个新的 API。
现在请执行以下任务,然后我们将讨论suspend函数的工作原理以及它们与常规函数的不同之处。我们还将看到如何从非挂起函数调用suspend函数。
任务
复制loadContributorsBlocking的实现(定义在src/tasks/Request1Blocking.kt)到loadContributorsSuspend(定义在src/tasks/Request4Suspend.kt)。然后以某种方式对其进行修改,以便使用新的挂起函数而不是返回Call的函数。运行选择该SUSPEND选项的程序,并确保在执行GitHub请求时UI仍然响应。
解决方案
修改代码很简单。只需替换getOrgReposCall(req.org).execute()为getOrgRepos(req.org),并为“contributors”请求执行相同的替换。就这样!其他一切都保持不变:
suspend fun loadContributorsSuspend(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
return repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}.aggregate()
}
注意:loadContributorsSuspend应该定义为一个suspend函数。
我们不再需要调用execute来返回了Response,因为现在 的API函数直接返回Response。但这是特定于Retrofit库的实现细节。使用其他库,API会有所不同,但概念是相同的:
我们的suspend函数代码看起来与“阻塞”版本惊人地相似。它是可读的,并且准确地表达了我们想要实现的目标。
与阻塞版本的主要区别在于,我们挂起协程而不是阻塞线程:
block -> suspend (阻塞变成了挂起)
thread -> coroutine (线程变成了协程)
协程通常被称为轻量级线程。这意味着我们可以在协程上运行代码,就像在线程上运行代码一样。之前阻塞的操作(因此必须避免),现在可以用挂起协程来替换。
我们如何启动一个新的协程?如果我们看一下我们是如何调用loadContributorsSuspend的,我们可以看到我们在launch的里面调用的它。launch是一个以 lambda 作为参数的库函数:
launch {
val users = loadContributorsSuspend(req)
updateResults(users, startTime)
}
launch开启一个新的计算。这个计算负责加载数据并显示结果。这个计算是可暂停的(suspendable):在执行网络请求时,它被“暂停”并释放底层线程。当网络请求返回结果时,恢复计算。
这种可暂停的计算称为协程,因此,我们将简单地说,在这种情况下,launch启动一个新的协程,负责加载数据并显示结果。
协程是在线程之上运行并且可以暂停的计算。当协程“暂停”时,相应的计算会暂停,从线程中移除,并存储在内存中。同时,线程可以自由地被其他活动占用:
当计算准备好继续时,它会返回到一个线程(但不一定是同一个线程)。
让我们回到我们的loadContributorsSuspend例子。现在,每个“contributors”请求都通过挂起机制等待结果。首先,发送新的请求。然后,在等待响应时,整个“load contributors”协程被挂起(我们上面讨论过的那个是由launch函数启动的)。协程只有在收到相应的响应后才会恢复:
在等待接收响应时,线程可以自由地被其他任务占用。这就是为什么当用户通过该COROUTINE选项加载时,UI 保持响应,尽管所有请求都发生在主 UI 线程上。日志确认所有请求都在主 UI 线程上发送:
2538 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos
2729 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - ts2kt: loaded 11 contributors
3029 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-koans: loaded 45 contributors
...
11252 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin-coroutines-workshop: loaded 1 contributors
该日志还可以向您显示相应代码在哪个协程上运行。要启用它,请打开Run | Edit configurations…并添加VM 选项:-Dkotlinx.coroutines.debug,如下:
设置了这个之后,当使用这个选项运行main函数时,协程名称将附加到线程名称,如:[AWT-EventQueue-0 @coroutine#1],如果没有加这个参数运行的话,显示的线程名称是这样的:[AWT-EventQueue-0]。我们还可以修改运行所有 Kotlin 文件的模板并默认启用此选项。
在我们的例子中,所有代码都运行在一个协程上,即上面提到的“load contributors”协程,表示为@coroutine#1。
在这个版本中,在等待结果的同时,我们不会重用线程来发送其他请求,因为我们已经按顺序编写了代码。只有在收到之前的结果时才会发送新的请求。suspend函数公平地对待线程,不会因为“等待”而阻塞它,但它还没有给图片带来任何并发性。让我们看看如何改进这一点。
与线程相比,Kotlin 协程非常便宜。每次我们想要异步启动一个新的计算时,我们都可以创建一个新的协程。
要启动一个新的协程,我们可以使用主要的“协程构建器”之一:launch、async或runBlocking. 不同的库可以定义额外的协程构建器。
async启动一个新的协程并返回一个Deferred对象。Deferred表示一个以其他名称已知的概念,例如Future或Promise:它存储计算,但它会延迟您获得最终结果的那一刻;它承诺在未来的某个时候得到结果。
async和launch之间的主要区别在于launch用于启动预计不会返回特定结果的计算。launch返回Job,代表协程。可以通过调用Job.join()来等到它完成。
Deferred是继承自Job的泛型类型。async调用可以返回Deferred<Int>或Deferred<CustomType>,这取决于 lambda 返回的内容(lambda 中的最后一个表达式是结果)。
为了获得协程的结果,我们调用Deferred实例的await()。在等待结果时,这个协程的await是调用的,挂起的:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferred: Deferred<Int> = async(Dispatchers.Default) {
loadData()
}
log("waiting...")
log("result = ${deferred.await()}")
}
suspend fun loadData(): Int {
log("loading...")
delay(1000L)
log("loaded!")
return 42
}
fun log(message: String) {
println("${Thread.currentThread().name},$message")
}
运行结果如下:
main @coroutine#1,waiting...
DefaultDispatcher-worker-1 @coroutine#2,loading...
DefaultDispatcher-worker-1 @coroutine#2,loaded!
main @coroutine#1,result = 42
runBlocking用于常规函数和suspend函数之间、阻塞和非阻塞世界之间的桥梁。它用作启动顶级主协程的适配器,主要用于main函数和测试。
要更好地了解此示例中发生的情况,请观看以下视频:
https://www.youtube.com/watch?v=zEZc5AmHQhk
如果有deferred对象列表,则可以调用awaitAll以等待它们所有的结果:
import kotlinx.coroutines.*
fun main() = runBlocking {
val deferreds: List<Deferred<Int>> = (1..3).map {
async {
delay(1000L * it)
println("Loading $it")
it
}
}
val sum = deferreds.awaitAll().sum()
println("$sum")
}
在新协程中启动每个“贡献者”请求时,所有请求都是异步启动的。在收到前一个请求的结果之前,可以发送一个新请求:
这给我们带来了与之前版本大致相同的总加载时间CALLBACKS。但不需要任何回调。更重要的是,async在代码中明确强调了哪些部分同时运行。
任务
在Request5Concurrent.kt文件中实现一个loadContributorsConcurrent函数。使用上一个loadContributorsSuspend函数。
提示
正如我们将在下面讨论的,我们只能在协程范围内启动一个新的协程。因此,将loadContributorsSuspend内容复制到coroutineScope中调用,以便我们可以在那里调用async函数:
suspend fun loadContributorsConcurrent(
service: GitHubService, req: RequestData
): List<User> = coroutineScope {
// ...
}
解决方案基于以下方案:
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
// load contributors for each repo
}
}
deferreds.awaitAll() // List<List<User>>
解决方案
我们将每个“贡献者”请求用async包裹起来. 这将创建与我们拥有的存储库数量一样多的协程。但是由于创建一个新的协程真的很便宜,所以这不是问题。我们可以根据需要创建任意数量。
async返回Deferred<List<User>>。我们不能再使用flatMap:这个map结果现在是一个Deferred对象列表,而不是列表的列表。awaitAll()返回List<List<User>>,所以我们只需要调用flatten().aggregate()即可获得结果:
suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val deferreds: List<Deferred<List<User>>> = repos.map { repo ->
async {
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
}
deferreds.awaitAll().flatten().aggregate()
}
运行代码并查看日志;我们可以看到所有的协程仍然在主 UI 线程上运行。我们还没有以任何方式使用多线程,但我们已经获得了并发运行协程的好处!
我们很容易更改此代码以在来自公共线程池的不同线程上运行“贡献者”协程。指定Dispatchers.Default为async函数的上下文参数:
async(Dispatchers.Default) { ... }
CoroutineDispatcher确定应该在哪个或哪些线程上运行相应的协程。如果我们不指定一个作为参数,那么async将使用外部范围的调度程序。
Dispatchers.Default表示 JVM 上的共享线程池。这个池提供了一种并行执行的方法。它由与可用的 CPU 内核一样多的线程组成,但如果只有一个内核,它仍然有两个线程。
修改loadContributorsConcurrent函数中的代码,以便在公共线程池的不同线程上启动新的协程。此外,在发送请求之前添加额外的日志记录:
async(Dispatchers.Default) {
log("starting loading for ${repo.name}")
service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
}
引入此更改后,我们可以在日志中观察到每个协程可以在线程池中的一个线程上启动并在另一个线程上恢复:
1946 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
1946 [DefaultDispatcher-worker-3 @coroutine#5] INFO Contributors - starting loading for dokka
1946 [DefaultDispatcher-worker-1 @coroutine#3] INFO Contributors - starting loading for ts2kt
...
2178 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors
2569 [DefaultDispatcher-worker-1 @coroutine#5] INFO Contributors - dokka: loaded 36 contributors
2821 [DefaultDispatcher-worker-2 @coroutine#3] INFO Contributors - ts2kt: loaded 11 contributors
例如,在此日志摘录中,coroutine#4在worker-2线程上启动并在worker-1线程上继续。
要仅在主 UI 线程上运行协程,我们应该指定Dispatchers.Main为参数:
launch(Dispatchers.Main) {
updateResults()
}
如果当我们在主线程上启动一个新的协程时主线程很忙,协程就会被挂起并安排在这个线程上执行。协程只会在线程空闲时恢复。
请注意,从外部范围使用调度程序而不是在每个端点上显式指定它被认为是一种很好的做法。在我们的例子中,当我们定义的loadContributorsConcurrent没有传递Dispatchers.Default作为参数的情况下时,我们可以以任何方式调用此函数:在具有一个默认调度程序的上下文中,在具有主 UI 线程的上下文中,或在具有自定义调度程序的上下文中。正如我们稍后将看到的,当从测试中调用loadContributorsConcurrent时,我们可以在TestCoroutineDispatcher的上下文中调用它以简化测试。因此,该解决方案更加灵活。
以下是我们在调用方指定调度程序的方式:
launch(Dispatchers.Default) {
val users = loadContributorsConcurrent(service, req)
withContext(Dispatchers.Main) {
updateResults(users, startTime)
}
}
将此更改应用于我们的项目,同时让loadContributorsConcurrent在继承的上下文中启动协程。运行代码并确保协程在线程池中的线程上执行。
updateResults应该在主 UI 线程上调用,所以我们在Dispatchers.Main的上下文中调用它。withContext使用指定的协程上下文调用给定的代码,挂起直到完成,然后返回结果。表达这一点的另一种但更冗长的方式是启动一个新的协程并显式等待(通过挂起)直到它完成:launch(context) { ... }.join()。
“使用外部范围的调度程序”究竟是如何工作的?更正确的说法是“使用外部范围上下文中的调度程序”。什么是协程作用域,它与协程上下文有何不同?(这两个可能会混淆)。为什么我们需要在coroutineScope的调用里面启动新的async“贡献者”协程?我们下次将会讨论这个问题。
协程作用域负责不同协程之间的结构和父子关系。我们总是在作用域内启动新的协程。 协程上下文存储用于运行给定协程的附加技术信息,例如协程自定义名称,或指定协程应在其上调度的线程的调度程序。
当launch、async或runBlocking用于启动新的协程时,它们会自动创建相应的作用域。所有这些函数都接受一个带有接收器的 lambda 作为参数,并且隐式接收器类型是CoroutineScope:
launch { /* this: CoroutineScope */
}
新的协程只能在一个作用域内启动。launch和async被声明为CoroutineScope的扩展函数,因此当我们调用它们时,必须始终传递隐式或显式接收器。通过runBlocking启动协程是唯一的例外:runBlocking被定义为顶级函数。但是因为它阻塞了当前线程,所以它主要用于main函数和测试中作为桥接函数。
当在runBlocking、launch或async内启动新的协程时,它是在作用域内自动启动的:
import kotlinx.coroutines.*
fun main() = runBlocking { /* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
当我们在runBlocking里面调用launch时,我们是在CoroutineScope类型的隐式接收器上调用的扩展函数。或者,我们可以显式调用:this.launch。
我们可以说嵌套协程(在本例中由launch启动)是外部协程(由runBlocking启动)的子协程。这种“父子”关系通过作用域起作用:子协程从父协程对应的作用域开始。
可以在不启动新协程的情况下创建新的scope,coroutineScope函数完成了这样的功能。当我们需要在一个不访问外部作用域的suspend函数的结构化方式启动新的协程,例如 在loadContributorsConcurrent里面,我们可以创建一个新的协程作用域,它会自动成为调用该suspend函数的外部作用域的子作用域。
也可以使用GlobalScope.async或GlobalScope.launch从全局范围启动一个新的协程。这将创建一个顶级的“独立”协程。
提供协程结构的机制称为“结构化并发”。让我们看看结构化并发相对于全局范围有什么好处:
作用域一般负责子协程,它们的生命周期依附于作用域的生命周期。
如果出现问题或用户只是改变主意并决定撤销操作,则作用域可以自动取消子协程。
作用域自动等待所有子协程完成。因此,如果作用域对应一个协程,则父协程直到在其作用域内启动的所有协程都完成后才会完成。
使用GlobalScope.async时,没有将多个协程绑定到较小范围的结构。从全局范围开始的协程都是独立的;它们的生命周期仅受整个应用程序生命周期的限制。可以存储对从全局范围启动的协程的引用并等待其完成或显式取消它,但它不会像结构化的那样自动发生。
总结就是:从coroutineScope中启动多个协程叫结构化并发,因为coroutineScope会等待其中的子协程结束,或者取消coroutineScope,则它的子协程也会自动取消,也就是说coroutineScope和它的子协程是绑定在一起的。而GlobalScope启动的子协程与GlobalScope协程却不是绑定在一起的,所以从GlobalScope启动多个子协程不能叫结构化并发。双比如,我们在runBlocking中启动多个子协程,这也不是结构化并发。
取消贡献者加载
让我们比较一下loadContributorsConcurrent函数的两个版本:一个使用coroutineScope启动所有子协程,另一个使用GlobalScope. 当我们尝试取消父协程时,我们将比较两个版本的行为。
我们可以给所有发送请求的协程添加 3 秒的延迟,这样在协程启动之后,请求发送之前,我们有足够的时间取消加载:
suspend fun loadContributorsConcurrent(service: GitHubService, req: RequestData): List<User> = coroutineScope {
// ...
async {
log("starting loading for ${repo.name}")
delay(3000)
// load repo contributors
}
// ...
result
}
复制loadContributorsConcurrent的实现到loadContributorsNotCancellable(在Request5NotCancellable.kt中)并删除一个新的coroutineScope的创建。 我们的async调用现在是失败的,因此我们需要通过GlobalScope.async的方式启动它们:
suspend fun loadContributorsNotCancellable(service: GitHubService, req: RequestData): List<User> { // #1
// ...
GlobalScope.async { // #2
log("starting loading for ${repo.name}")
delay(3000)
// load repo contributors
}
// ...
return result // #3
}
该函数现在直接返回结果,而不是作为lambda内的最后一个表达式(行#1和#3),并且所有“贡献者”协程都在GlobalScope内启动,而不是作为协程范围的子协程(行#2)。
完整代码如下:
suspend fun loadContributorsNotCancellable(service: GitHubService, req: RequestData): List<User> {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.body() ?: listOf()
var index = 0
val deferredList: List<Deferred<List<User>>> = repos.map { repo ->
GlobalScope.async {
log("starting loading for ${repo.name},且先挂起5秒钟")
delay(5000)
service
.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it, ++index) }
.bodyList()
}
}
return deferredList.awaitAll().flatten().aggregate()
}
我们通过选择CONCURRENT的版本加载贡献者的方式运行程序,我们需要等到所有“贡献者”协程都启动,然后点击“取消”。查看日志,我们可以看到确实取消了所有请求,因为没有记录任何结果:
2896 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 40 repos
2901 [DefaultDispatcher-worker-2 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
...
2909 [DefaultDispatcher-worker-5 @coroutine#36] INFO Contributors - starting loading for mpp-example
/* click on 'cancel' */
/* no requests are sent */
现在让我们重复相同的过程,但这次选择NOT_CANCELLABLE选项:
2570 [AWT-EventQueue-0 @coroutine#1] INFO Contributors - kotlin: loaded 30 repos
2579 [DefaultDispatcher-worker-1 @coroutine#4] INFO Contributors - starting loading for kotlin-koans
...
2586 [DefaultDispatcher-worker-6 @coroutine#36] INFO Contributors - starting loading for mpp-example
/* click on 'cancel' */
/* but all the requests are still sent: */
6402 [DefaultDispatcher-worker-5 @coroutine#4] INFO Contributors - kotlin-koans: loaded 45 contributors
...
9555 [DefaultDispatcher-worker-8 @coroutine#36] INFO Contributors - mpp-example: loaded 8 contributors
什么都没发生!没有协程被取消,所有的请求仍然被发送。
让我们看看取消是如何在我们的“贡献者”计划中实现的。单击cancel按钮时,我们需要显式取消 “主加载协程” 。然后它会自动取消所有 “子协程” 。
这就是我们如何在按钮单击时取消“加载”的协程:
interface Contributors {
fun loadContributors() {
// ...
when (getSelectedVariant()) {
CONCURRENT -> {
launch {
val users = loadContributorsConcurrent(req)
updateResults(users, startTime)
}.setUpCancellation() // #1
}
}
}
private fun Job.setUpCancellation() {
val loadingJob = this // #2
// cancel the loading job if the 'cancel' button was clicked:
val listener = ActionListener {
loadingJob.cancel() // #3
updateLoadingStatus(CANCELED)
}
addCancelListener(listener)
// update the status and remove the listener after the loading job is completed
}
}
launch函数返回一个Job实例。Job存储“加载协程”的引用,它加载所有数据并更新结果。我们可以调用它的扩展函数setUpCancellation()(行#1),传递一个Job实例作为接收者。我们可以表达这一点的另一种显式方式:
val job = launch { ... }
job.setUpCancellation()
为了可读性,在setUpCancellation函数内部,我们可以通过新变量loadingJob(行#2)来引用它的接收者。然后我们可以为按钮添加一个监听器,然后当它被点击时loadingJob被取消(行#3)。
使用结构化并发,我们只需要取消父协程,这会将自动传播取消所有子协程。
使用外部作用域的上下文
当我们在给定作用域内启动新的协程时,确保所有协程都在相同的上下文中运行非常容易。如果需要,替换上下文也非常容易。
现在让我们回到上一节末尾的问题:“使用外部作用域的调度程序”究竟是如何工作的?(或者,更具体地说,“从外部作用域的上下文中使用调度程序”)?
通过coroutineScope或者协程构建器创建的新作用域,总是从外部作用域继承上下文。在这种情况下,外部作用域是suspend loadContributorsConcurrent被调用的作用域:
launch(Dispatchers.Default) { // outer scope
val users = loadContributorsConcurrent(service, req)
// ...
}
所有嵌套的协程都使用继承的上下文自动启动;并且调度程序是这个上下文的一部分。这就是为什么所有通过async启动的协程都是在默认调度程序的上下文中启动的:
suspend fun loadContributorsConcurrent(
service: GitHubService, req: RequestData
): List<User> = coroutineScope {
// this scope inherits the context from the outer scope
// ...
async { // nested coroutine started with the inherited context
// ...
}
// ...
}
使用结构化并发,我们可以在创建顶级协程时一次性指定主要的上下文元素(如调度程序)。然后,所有嵌套的协程都会继承上下文并仅在需要时对其进行修改。
请注意,当我们为 UI 应用程序(例如 Android)编写带有协程的代码时,通常的做法是使用CoroutineDispatchers.Main为顶级协程的默认调度程序,然后当我们需要在不同线程上运行代码时显式放置不同的调度程序。
尽管某些存储库的信息加载速度相当快,但用户只有在加载所有数据后才能看到结果列表。在那之前,加载器图标运行显示进度,但没有关于当前状态的信息,以及已经加载了哪些贡献者。
我们可以更早地显示中间结果,并在加载每个存储库的数据后显示所有贡献者:
要实现此功能,我们需要将更新 UI 的逻辑作为回调传递,以便在每个中间状态上调用它:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
suspend updateResults: (List<User>, completed: Boolean) -> Unit
) {
// loading the data
// calling `updateResults` on intermediate states
}
在调用站点上,我们通过回调更新Main线程的结果:
launch(Dispatchers.Default) {
loadContributorsProgress(service, req) { users, completed ->
withContext(Dispatchers.Main) {
updateResults(users, startTime, completed)
}
}
}
loadContributorsProgress中的updateResults参数被声明为suspend。我们需要它来调用withContext,它是对应的lambda参数中的一个suspend函数。
updateResults回调需要一个附加Boolean参数作为参数,说明是否所有加载都已完成并且我们的结果是否为最终结果。
任务
实现loadContributorsProgress函数以显示中间进度(在Request6Progress.kt文件中),基于loadContributorsSuspend函数(来自Request4Suspend.kt) 。我们将使用一个没有并发的简单版本;我们将在下一节讨论如何为这个解决方案添加并发性。
请注意,贡献者的中间列表应该以“聚合”状态显示,而不仅仅是为每个存储库加载的用户列表。加载每个新存储库的数据时,应增加每个用户的贡献总数。
解决方案
我们需要将加载的贡献者的中间列表存储在“聚合”状态。我们可以定义一个allUsers变量来存储用户列表,然后在每个新存储库的贡献者加载后更新它:
suspend fun loadContributorsProgress(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
var allUsers = emptyList<User>()
for ((index, repo) in repos.withIndex()) {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, index == repos.lastIndex)
}
}
每个updateResults请求完成后都会调用回调:
到目前为止,我们还没有使用任何并发。这段代码是顺序的,所以我们不需要同步。
我们希望同时发送请求并在获得每个存储库的响应后更新中间结果:
我们如何为这个解决方案添加并发性?Channels解决了这个问题。
众所周知,编写具有共享可变状态的代码非常困难且容易出错(即使在本教程中,我们在使用回调实现解决方案时也有机会遇到)。通过通信来共享信息代替使用共同的可变状态来共享信息来试图简化这一点。协程可以通过Channel相互通信。
Channel是允许我们在不同协程之间传递数据的通信原语。一个协程可以向Channel发送一些信息,而另一个可以从Channel接收这些信息:
发送(生产)信息的协程通常称为生产者,接收(消费)信息的协程称为消费者。当需要的时候,很多协程可以向同一个通道发送信息,很多协程可以从它那里接收信息:
请注意,当许多协程从同一通道接收信息时,每个元素仅由其中一个消费者处理一次;自动处理它意味着从通道中删除这个元素。
我们可以将通道视为类似于元素的集合(直接模拟将是一个队列:元素被添加到一端并从另一端接收)。但是,有一个重要的区别:与集合不同,即使在它们的同步版本中,一个通道也可以暂停发送和接收操作。当通道为空或已满时会发生这种情况(通道的大小可能会受到限制,然后它可能已满)。
Channel通过三个不同的接口表示:SendChannel、ReceiveChannel和扩展了前两个接口的Channel。您通常创建一个通道并将其作为SendChannel实例提供给生产者,以便只有他们可以发送给它,并作为ReceiveChannel实例提供给消费者,以便只有他们可以从中接收。请注意,send和receive方法都声明为suspend:
interface SendChannel<in E> {
suspend fun send(element: E)
fun close(): Boolean
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>
生产者可以关闭一个通道以指示没有更多元素即将到来。
库中定义了几种类型的通道。它们的不同之处在于它们可以在内部存储多少元素,以及send调用是否可以挂起。对于所有通道类型,receive调用的行为方式相同:如果通道不为空,它会接收一个元素,否则会挂起。
无限通道
无限通道是最接近队列的模拟:生产者可以将元素发送到该通道,并且它将无限增长。send调用永远不会被暂停。如果没有更多的内存,你会得到一个OutOfMemoryException. 和队列的不同出现在:当消费者尝试从空通道接收并被挂起直到一些新元素发送到该通道时。
缓冲通道
缓冲通道的大小受指定数量的限制。生产者可以将元素发送到此通道,直到达到大小限制。所有元素都在内部存储。当通道已满时,它的下一次send调用将暂停,直到出现更多可用空间。
"约会"通道
“约会”通道是没有缓冲区的通道;这与创建大小为零的缓冲通道相同。一个函数 (send或receive) 总是被挂起,直到另一个函数被调用。如果send函数被调用并且没有挂起的receive准备好处理元素,则send挂起。类似地,如果receive函数被调用并且通道是空的——或者换句话说,没有挂起的send准备好发送元素——receive调用挂起。“约会”名称(“在约定的时间和地点举行的会议”)指的是send并且receive 应该“准时会面”这一事实。
合并频道
发送到合并通道的新元素将覆盖先前发送的元素,因此接收者将始终只获得最新的元素。send调用永远不会挂起。
当你创建一个通道时,你指定它的类型或缓冲区大小(如果你需要一个缓冲的):
val rendezvousChannel = Channel<String>() // 约会通道
val bufferedChannel = Channel<String>(10) // 缓冲通道
val conflatedChannel = Channel<String>(CONFLATED) // 合并通道
val unlimitedChannel = Channel<String>(UNLIMITED) // 无限通道
在下面的示例中,我们将创建一个“集合点”通道、两个生产者协程和一个消费者协程:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*
fun main() = runBlocking<Unit> {
val channel = Channel<String>()
launch {
channel.send("A1")
channel.send("A2")
log("A done")
}
launch {
channel.send("B1")
log("B done")
}
launch {
repeat(3) {
val x = channel.receive()
log(x)
}
}
}
fun log(message: Any?) {
println("[${Thread.currentThread().name}] $message")
}
运行结果如下:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
如果您想更好地了解此示例中发生的情况,请观这个视频:https://www.youtube.com/watch?v=HpWQUoVURWQ
视频是英文的看起来也有点痛苦,这里我分步讲解,如下:
这个示例代码中,共有4个协程,每个协程都是运行在main线程上的,如下:
如上图,共有4个协程,我们着重关注序号为2、3、4的协程中的代码,如下:
因为这三个协程都是运行在main线程上的,所以这3个协程肯定是一个一个执行的,不可能并发。
首先是main @coroutine#2协程先执行,执行第send("A1")语句的时候,由于没有对应的receive()调用,所以调用send函数时导致协程被挂起,如下图:
main @coroutine#2协程被挂起后,main @coroutine#3协程得到执行,也是一样的,调用send("B1")函数,因为没有对应的receive()调用,所以协程也被挂起,如下图:
main @coroutine#2和main @coroutine#3都挂起后,main @coroutine#4得到执行,当调用receive()时,已经有对应的send("A1")和send("B1")调用,因为send("A1")是先调用的,所以"A1"被发送出去,所以它对应的main @coroutine#2协程被恢复,但是因为只有一个线程,即main线程,main线程还在main @coroutine#4上执行代码,所以main @coroutine#2虽然被恢复了,但是还没有得到执行。换句话说,调用receive()的时候,因为有对应的send调用,所以receive()不会导致协程被挂起,所以main线程继续在main @coroutine#4协程上执行,所以,虽然此时main @coroutine#2被恢复了,但是还没有得到执行,此时的状态如下:
main线程继续在main @coroutine#4上运行,执行log(x)函数,此时就能看到控制台输出 “[main @coroutine#4] A1”,因为是一个3次的循环体,输出一次后,又执行到receive()函数,此时还有对应的send("B1")在等着,所以"B1"被发送出去,send("B1")对应的main @coroutine#3协程被恢复,也因为调用receive()的时候,因为有对应的send调用,所以receive()不会导致协程被挂起,所以main线程继续在main @coroutine#4协程上执行,此时的状态如下:
main线程继续在main @coroutine#4上运行,执行log(x)函数,此时就能看到控制台输出 “[main @coroutine#4] B1”,然后第三次循环执行到receive()函数,此时没有对应的send()调用,所以main @coroutine#4被挂起,此时有两个被恢复的协程还在等着被执行,因为是main @coroutine#2先被恢复的,所以会先得到执行,此时状态如下:
main @coroutine#2得到执行权,开始执行第二个语句,即send("A2"),由于此时有对应的receive()函数在等着了,所以receive()对应的main @coroutine#4协程被恢复,但是没有得到执行,因为调用send("A2")时有对应的receive()函数在等着了,所以send("A2")不会导致协程被挂起,所以main线程继续在main @coroutine#2上执行,此时的状态如下:
main线程继续在main @coroutine#2协程上运行,执行log("A done")后,就可以在控制台看到输出 “[main @coroutine#2] A done”,此时协程就运行结束了,可以被垃圾回收器回收了,此时有两个被恢复的协程,由于main @coroutine#3是先被恢复的,所以优先得到执行权,此时状态如下:
main @coroutine#3协程得到执行权,执行下一个语句log("B done"),此时就可以在控制台看到输出 “[main @coroutine#3] B done”,此时该协程也执行结束了,最后一个协程就得到了执行权,此时的状态如下:
[main @coroutine#3]得到了执行权开始执行,接收到最后发送过来的"A2"并调用log(x)进行打印,然后在控制台就能看到输出[main @coroutine#4] A2,至此,三个协程都执行结束了,状态如下:
此时,控制台的所有输出如下:
[main @coroutine#4] A1
[main @coroutine#4] B1
[main @coroutine#2] A done
[main @coroutine#3] B done
[main @coroutine#4] A2
现在,应该能理解为什么是这样的输出结果了。
任务
实现loadContributorsChannels函数,以并发地请求所有 GitHub 贡献者,但同时显示中间进度。使用这两个前面的函数:Request5Concurrent.kt中的loadContributorsConcurrent和Request6Progress.kt中的loadContributorsProgress。
提示
同时接收不同存储库的贡献者列表的不同协程可以将所有接收到的结果发送到同一个通道:
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = ...
// ...
channel.send(users)
}
}
然后来自这个通道中的元素可以一个一个地接收并处理:
repeat(repos.size) {
val users = channel.receive()
...
}
由于我们调用receive按顺序调用,因此不需要额外的同步。
解决方案
与loadContributorsProgress函数一样,我们可以创建一个allUsers变量来存储“所有贡献者”列表的中间状态。当我们从频道接收到每个新列表时,我们将其添加到所有用户的列表中,聚合结果,并使用updateResults回调更新状态:
suspend fun loadContributorsChannels(
service: GitHubService,
req: RequestData,
updateResults: suspend (List<User>, completed: Boolean) -> Unit
) = coroutineScope {
val repos = service
.getOrgRepos(req.org)
.also { logRepos(req, it) }
.bodyList()
val channel = Channel<List<User>>()
for (repo in repos) {
launch {
val users = service.getRepoContributors(req.org, repo.name)
.also { logUsers(repo, it) }
.bodyList()
channel.send(users)
}
}
var allUsers = emptyList<User>()
repeat(repos.size) {
val users = channel.receive()
allUsers = (allUsers + users).aggregate()
updateResults(allUsers, it == repos.lastIndex)
}
}
请注意,不同存储库的结果会在准备好后立即添加到通道中。起初,当所有请求都发送完毕而没有收到数据时,receive暂停。在这种情况下,整个“负载贡献者”协程暂停。然后,当用户列表发送到频道时,“负载贡献者”协程恢复,receive调用返回这个列表,结果立即更新。
我们已经学习了如何使用挂起函数、并发执行协程以及使用通道在协程之间共享信息。
请注意,协程和通道都不会完全消除并发带来的复杂性,但是当您需要推理并了解正在发生的事情时,它们肯定会让您的生活更轻松。
接下来,我们将讨论如何测试使用协程的代码。
让我们讨论如何测试使用协程的代码。最好测试一下我们的解决方案,确保使用并发协程的解决方案比使用suspend函数的解决方案更快,并检查使用通道的解决方案是否比简单的“进度”解决方案更快。让我们看看如何比较这些解决方案的总运行时间。
让我们模拟一个 GitHub 服务并让该服务在给定超时后返回结果:
repos request - returns an answer within 1000 ms delay
repo-1 - 1000 ms delay
repo-2 - 1200 ms delay
repo-3 - 800 ms delay
然后,我们可以测试具有suspend函数的顺序解决方案应该花费大约 4000 毫秒:
4000 = 1000 + (1000 + 1200 + 800)
并发解决方案大约需要 2200 毫秒:
2200 = 1000 + max(1000, 1200, 800)
对于显示进度的解决方案,我们还可以检查带有时间戳的中间结果。
相应的测试数据在定义在test/contributors/testData.kt中,还有文件Request4SuspendKtTest,…Request7ChannelsKtTest包含使用模拟服务调用的简单测试。
但是,我们这里有两个问题:
这些测试需要很长时间才能运行。每次测试大约需要 2 到 4 秒,所以我们每次都需要等待结果。这种方法不是很有效。
我们不能依赖解决方案运行的确切时间,因为它仍然需要额外的时间来预热和运行代码等。我们可以添加一个额外的常量,但它会因机器而异。请注意,模拟服务延迟应高于此常数,以便我们可以看到差异。如果常数为 0.5 秒,则将延迟设为 0.1 秒是不够的。
更好的方法是使用特殊的框架来测试时间,同时多次运行相同的代码(这会进一步增加总时间),但是学习和设置起来很复杂。
然而,在这种情况下,我们想要测试一件非常简单的事情:我们提供的测试延迟解决方案的行为符合我们的预期;一个比另一个快。我们还对现实生活中的性能测试不感兴趣。
为了解决这些问题,我们可以使用虚拟时间。为此,我们需要使用一个特殊的测试调度器。它跟踪从一开始就经过的虚拟时间,并立即实时运行所有内容。当我们在这个调度器上运行协程时,它delay会立即返回并提前虚拟时间。
使用这种机制的测试运行速度很快,但您仍然可以检查虚拟时间不同时刻发生的情况。总运行时间显着减少:
如何利用虚拟时间?将runBlocking调用替换为runBlockingTest。runBlockingTest将扩展 lambda 到TestCoroutineScope作为参数。当你在这个特殊范围内一个挂起函数delay时,会增加虚拟时间而不是实时延迟:
@Test
fun testDelayInSuspend() = runBlockingTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
foo()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 6 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun foo() {
delay(1000) // auto-advances without delay
println("foo") // executes eagerly when foo() is called
}
我们可以使用TestCoroutineScope的currentTime属性来检查当前的虚拟时间。请注意,此示例中的实际运行时间为几毫秒,而虚拟时间恰好等于延迟参数,即 1000 毫秒。
为了在子协程中享受“虚拟”delay的全部效果,我们应该使用TestCoroutineDispatcher来启动所有子协程。 否则,它将无法正常工作。这个调度器自动继承自另一个TestCoroutineScope,除非我们提供不同的调度器:
@Test
fun testDelayInLaunch() = runBlockingTest {
val realStartTime = System.currentTimeMillis()
val virtualStartTime = currentTime
bar()
println("${System.currentTimeMillis() - realStartTime} ms") // ~ 11 ms
println("${currentTime - virtualStartTime} ms") // 1000 ms
}
suspend fun bar() = coroutineScope {
launch {
delay(1000) // auto-advances without delay
println("bar") // executes eagerly when bar() is called
}
}
在上面的例子中,我们可以尝试使用Dispatchers.Default上下文来调用launch以确保测试失败:我们将收到一个异常,说明作业尚未完成。
只有当它使用继承的上下文启动子协程时,我们才能以这种方式测试loadContributorsConcurrent函数,而无需使用Dispatchers.Default调度程序对其进行修改。我们可以在调用函数时而不是在定义函数时指定诸如调度程序之类的上下文元素:这更灵活且更易于测试。
请注意,支持虚拟时间的测试 API 是实验性的,将来可能会发生变化。默认情况下,如果我们使用它,我们会看到编译器警告。为了抑制这些警告,我们需要注释测试函数或包含测试的整个类。我们可以添加@OptIn(ExperimentalCoroutinesApi::class)到我们的测试类或函数中。通过添加此类注释,我们强调我们了解 API 可以更改,并准备好在需要时更新我们的用法(很可能是自动)。
我们还需要添加编译器参数,告诉它我们正在使用实验性 API:
compileTestKotlin {
kotlinOptions {
freeCompilerArgs += "-Xuse-experimental=kotlin.Experimental"
}
}
在本教程对应的项目中,它已经添加到 gradle 脚本中。
任务
重构在tests/tasks/中的以下所有测试到使用虚拟时间而不是实时:
Request4SuspendKtTest.kt
Request5ConcurrentKtTest.kt
Request6ProgressKtTest.kt
Request7ChannelsKtTest.kt
将总运行时间与此重构之前的时间进行比较。
提示
使用runBlockingTest替换runBlocking调用,还有用currentTime替换System.currentTimeMillis():
@Test
fun test() = runBlockingTest {
val startTime = currentTime
// action
val totalTime = currentTime - startTime
// testing result
}
取消注释检查确切虚拟时间的评论断言。我们不要忘记添加
@UseExperimental(ExperimentalCoroutinesApi::class).
解决方案
这是并发案例的解决方案:
fun testConcurrent() = runBlockingTest {
val startTime = currentTime
val result = loadContributorsConcurrent(MockGithubService, testRequestData)
Assert.assertEquals("Wrong result for 'loadContributorsConcurrent'", expectedConcurrentResults.users, result)
val totalTime = currentTime - startTime
Assert.assertEquals(
"The calls run concurrently, so the total virtual time should be 2200 ms: " +
"1000 for repos request plus max(1000, 1200, 800) = 1200 for concurrent contributors requests)",
expectedConcurrentResults.timeFromStart, totalTime
)
}
当我们检查进度时,我们首先检查结果是否准确地在预期的虚拟时间可用,然后我们检查结果:
fun testChannels() = runBlockingTest {
val startTime = currentTime
var index = 0
loadContributorsChannels(MockGithubService, testRequestData) {
users, _ ->
val expected = concurrentProgressResults[index++]
val time = currentTime - startTime
Assert.assertEquals("Expected intermediate results after ${expected.timeFromStart} ms:",
expected.timeFromStart, time)
Assert.assertEquals("Wrong intermediate results after $time:", expected.users, users)
}
}
与“progress”版本相比,带有“channels”的最后一个版本的第一个中间结果更快可用,我们可以看到使用虚拟时间的测试中的差异。
其余“暂停”和“进度”任务的测试非常相似;您可以在项目“解决方案”分支中找到它们。
您可以在这里找到有关使用虚拟时间和实验测试包的更多信息。与我们分享您对它的效果的反馈!