mirror of https://github.com/M66B/FairEmail.git
parent
d0f5d7ffdf
commit
55433cd679
@ -0,0 +1,478 @@
|
||||
/*
|
||||
* Copyright 2019 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package androidx.lifecycle
|
||||
|
||||
import android.annotation.SuppressLint
|
||||
import android.os.Build
|
||||
import androidx.annotation.MainThread
|
||||
import androidx.annotation.RequiresApi
|
||||
import java.time.Duration
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
|
||||
internal const val DEFAULT_TIMEOUT = 5000L
|
||||
|
||||
/**
|
||||
* Interface that allows controlling a [LiveData] from a coroutine block.
|
||||
*
|
||||
* @see liveData
|
||||
*/
|
||||
public interface LiveDataScope<T> {
|
||||
/**
|
||||
* Set's the [LiveData]'s value to the given [value]. If you've called [emitSource] previously,
|
||||
* calling [emit] will remove that source.
|
||||
*
|
||||
* Note that this function suspends until the value is set on the [LiveData].
|
||||
*
|
||||
* @param value The new value for the [LiveData]
|
||||
*
|
||||
* @see emitSource
|
||||
*/
|
||||
public suspend fun emit(value: T)
|
||||
|
||||
/**
|
||||
* Add the given [LiveData] as a source, similar to [MediatorLiveData.addSource]. Calling this
|
||||
* method will remove any source that was yielded before via [emitSource].
|
||||
*
|
||||
* @param source The [LiveData] instance whose values will be dispatched from the current
|
||||
* [LiveData].
|
||||
*
|
||||
* @see emit
|
||||
* @see MediatorLiveData.addSource
|
||||
* @see MediatorLiveData.removeSource
|
||||
*/
|
||||
public suspend fun emitSource(source: LiveData<T>): DisposableHandle
|
||||
|
||||
/**
|
||||
* References the current value of the [LiveData].
|
||||
*
|
||||
* If the block never `emit`ed a value, [latestValue] will be `null`. You can use this
|
||||
* value to check what was then latest value `emit`ed by your `block` before it got cancelled.
|
||||
*
|
||||
* Note that if the block called [emitSource], then `latestValue` will be last value
|
||||
* dispatched by the `source` [LiveData].
|
||||
*/
|
||||
public val latestValue: T?
|
||||
}
|
||||
|
||||
internal class LiveDataScopeImpl<T>(
|
||||
internal var target: CoroutineLiveData<T>,
|
||||
context: CoroutineContext
|
||||
) : LiveDataScope<T> {
|
||||
|
||||
override val latestValue: T?
|
||||
get() = target.value
|
||||
|
||||
// use `liveData` provided context + main dispatcher to communicate with the target
|
||||
// LiveData. This gives us main thread safety as well as cancellation cooperation
|
||||
private val coroutineContext = context + Dispatchers.Main.immediate
|
||||
|
||||
override suspend fun emitSource(source: LiveData<T>): DisposableHandle =
|
||||
withContext(coroutineContext) {
|
||||
return@withContext target.emitSource(source)
|
||||
}
|
||||
|
||||
@SuppressLint("NullSafeMutableLiveData")
|
||||
override suspend fun emit(value: T) = withContext(coroutineContext) {
|
||||
target.clearSource()
|
||||
target.value = value
|
||||
}
|
||||
}
|
||||
|
||||
internal suspend fun <T> MediatorLiveData<T>.addDisposableSource(
|
||||
source: LiveData<T>
|
||||
): EmittedSource = withContext(Dispatchers.Main.immediate) {
|
||||
addSource(source) {
|
||||
value = it
|
||||
}
|
||||
EmittedSource(
|
||||
source = source,
|
||||
mediator = this@addDisposableSource
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Holder class that keeps track of the previously dispatched [LiveData].
|
||||
* It implements [DisposableHandle] interface while also providing a suspend clear function
|
||||
* that we can use internally.
|
||||
*/
|
||||
internal class EmittedSource(
|
||||
private val source: LiveData<*>,
|
||||
private val mediator: MediatorLiveData<*>
|
||||
) : DisposableHandle {
|
||||
// @MainThread
|
||||
private var disposed = false
|
||||
/**
|
||||
* Unlike [dispose] which cannot be sync because it not a coroutine (and we do not want to
|
||||
* lock), this version is a suspend function and does not return until source is removed.
|
||||
*/
|
||||
suspend fun disposeNow() = withContext(Dispatchers.Main.immediate) {
|
||||
removeSource()
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
CoroutineScope(Dispatchers.Main.immediate).launch {
|
||||
removeSource()
|
||||
}
|
||||
}
|
||||
|
||||
@MainThread
|
||||
private fun removeSource() {
|
||||
if (!disposed) {
|
||||
mediator.removeSource(source)
|
||||
disposed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal typealias Block<T> = suspend LiveDataScope<T>.() -> Unit
|
||||
|
||||
/**
|
||||
* Handles running a block at most once to completion.
|
||||
*/
|
||||
internal class BlockRunner<T>(
|
||||
private val liveData: CoroutineLiveData<T>,
|
||||
private val block: Block<T>,
|
||||
private val timeoutInMs: Long,
|
||||
private val scope: CoroutineScope,
|
||||
private val onDone: () -> Unit
|
||||
) {
|
||||
// currently running block job.
|
||||
private var runningJob: Job? = null
|
||||
|
||||
// cancelation job created in cancel.
|
||||
private var cancellationJob: Job? = null
|
||||
|
||||
@MainThread
|
||||
fun maybeRun() {
|
||||
cancellationJob?.cancel()
|
||||
cancellationJob = null
|
||||
if (runningJob != null) {
|
||||
return
|
||||
}
|
||||
runningJob = scope.launch {
|
||||
val liveDataScope = LiveDataScopeImpl(liveData, coroutineContext)
|
||||
block(liveDataScope)
|
||||
onDone()
|
||||
}
|
||||
}
|
||||
|
||||
@MainThread
|
||||
fun cancel() {
|
||||
if (cancellationJob != null) {
|
||||
error("Cancel call cannot happen without a maybeRun")
|
||||
}
|
||||
cancellationJob = scope.launch(Dispatchers.Main.immediate) {
|
||||
delay(timeoutInMs)
|
||||
if (!liveData.hasActiveObservers()) {
|
||||
// one last check on active observers to avoid any race condition between starting
|
||||
// a running coroutine and cancelation
|
||||
runningJob?.cancel()
|
||||
runningJob = null
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal class CoroutineLiveData<T>(
|
||||
context: CoroutineContext = EmptyCoroutineContext,
|
||||
timeoutInMs: Long = DEFAULT_TIMEOUT,
|
||||
block: Block<T>
|
||||
) : MediatorLiveData<T>() {
|
||||
private var blockRunner: BlockRunner<T>?
|
||||
private var emittedSource: EmittedSource? = null
|
||||
|
||||
init {
|
||||
// use an intermediate supervisor job so that if we cancel individual block runs due to losing
|
||||
// observers, it won't cancel the given context as we only cancel w/ the intention of possibly
|
||||
// relaunching using the same parent context.
|
||||
val supervisorJob = SupervisorJob(context[Job])
|
||||
|
||||
// The scope for this LiveData where we launch every block Job.
|
||||
// We default to Main dispatcher but developer can override it.
|
||||
// The supervisor job is added last to isolate block runs.
|
||||
val scope = CoroutineScope(Dispatchers.Main.immediate + context + supervisorJob)
|
||||
blockRunner = BlockRunner(
|
||||
liveData = this,
|
||||
block = block,
|
||||
timeoutInMs = timeoutInMs,
|
||||
scope = scope
|
||||
) {
|
||||
blockRunner = null
|
||||
}
|
||||
}
|
||||
|
||||
internal suspend fun emitSource(source: LiveData<T>): DisposableHandle {
|
||||
clearSource()
|
||||
val newSource = addDisposableSource(source)
|
||||
emittedSource = newSource
|
||||
return newSource
|
||||
}
|
||||
|
||||
internal suspend fun clearSource() {
|
||||
emittedSource?.disposeNow()
|
||||
emittedSource = null
|
||||
}
|
||||
|
||||
override fun onActive() {
|
||||
super.onActive()
|
||||
blockRunner?.maybeRun()
|
||||
}
|
||||
|
||||
override fun onInactive() {
|
||||
super.onInactive()
|
||||
blockRunner?.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a LiveData that has values yielded from the given [block] that executes on a
|
||||
* [LiveDataScope].
|
||||
*
|
||||
* The [block] starts executing when the returned [LiveData] becomes [active](LiveData.onActive).
|
||||
* If the [LiveData] becomes [inactive](LiveData.onInactive) while the [block] is executing, it
|
||||
* will be cancelled after [timeoutInMs] milliseconds unless the [LiveData] becomes active again
|
||||
* before that timeout (to gracefully handle cases like Activity rotation). Any value
|
||||
* [LiveDataScope.emit]ed from a cancelled [block] will be ignored.
|
||||
*
|
||||
* After a cancellation, if the [LiveData] becomes active again, the [block] will be re-executed
|
||||
* from the beginning. If you would like to continue the operations based on where it was stopped
|
||||
* last, you can use the [LiveDataScope.latestValue] function to get the last
|
||||
* [LiveDataScope.emit]ed value.
|
||||
|
||||
* If the [block] completes successfully *or* is cancelled due to reasons other than [LiveData]
|
||||
* becoming inactive, it *will not* be re-executed even after [LiveData] goes through active
|
||||
* inactive cycle.
|
||||
*
|
||||
* As a best practice, it is important for the [block] to cooperate in cancellation. See kotlin
|
||||
* coroutines documentation for details
|
||||
* https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html.
|
||||
*
|
||||
* The [timeoutInMs] can be changed to fit different use cases better, for example increasing it
|
||||
* will give more time to the [block] to complete even if [LiveData] is inactive. It is good for
|
||||
* cases when [block] is finite (meaning it can complete successfully) and is costly to restart.
|
||||
* Otherwise if a [block] is cheap to restart, decreasing the [timeoutInMs] value will allow to
|
||||
* yield less values that aren't consumed by anything.
|
||||
*
|
||||
* ```
|
||||
* // a simple LiveData that receives value 3, 3 seconds after being observed for the first time.
|
||||
* val data : LiveData<Int> = liveData {
|
||||
* delay(3000)
|
||||
* emit(3)
|
||||
* }
|
||||
*
|
||||
*
|
||||
* // a LiveData that fetches a `User` object based on a `userId` and refreshes it every 30 seconds
|
||||
* // as long as it is observed
|
||||
* val userId : LiveData<String> = ...
|
||||
* val user = userId.switchMap { id ->
|
||||
* liveData {
|
||||
* while(true) {
|
||||
* // note that `while(true)` is fine because the `delay(30_000)` below will cooperate in
|
||||
* // cancellation if LiveData is not actively observed anymore
|
||||
* val data = api.fetch(id) // errors are ignored for brevity
|
||||
* emit(data)
|
||||
* delay(30_000)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // A retrying data fetcher with doubling back-off
|
||||
* val user = liveData {
|
||||
* var backOffTime = 1_000
|
||||
* var succeeded = false
|
||||
* while(!succeeded) {
|
||||
* try {
|
||||
* emit(api.fetch(id))
|
||||
* succeeded = true
|
||||
* } catch(ioError : IOException) {
|
||||
* delay(backOffTime)
|
||||
* backOffTime *= minOf(backOffTime * 2, 60_000)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // a LiveData that tries to load the `User` from local cache first and then tries to fetch
|
||||
* // from the server and also yields the updated value
|
||||
* val user = liveData {
|
||||
* // dispatch loading first
|
||||
* emit(LOADING(id))
|
||||
* // check local storage
|
||||
* val cached = cache.loadUser(id)
|
||||
* if (cached != null) {
|
||||
* emit(cached)
|
||||
* }
|
||||
* if (cached == null || cached.isStale()) {
|
||||
* val fresh = api.fetch(id) // errors are ignored for brevity
|
||||
* cache.save(fresh)
|
||||
* emit(fresh)
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // a LiveData that immediately receives a LiveData<User> from the database and yields it as a
|
||||
* // source but also tries to back-fill the database from the server
|
||||
* val user = liveData {
|
||||
* val fromDb: LiveData<User> = roomDatabase.loadUser(id)
|
||||
* emitSource(fromDb)
|
||||
* val updated = api.fetch(id) // errors are ignored for brevity
|
||||
* // Since we are using Room here, updating the database will update the `fromDb` LiveData
|
||||
* // that was obtained above. See Room's documentation for more details.
|
||||
* // https://developer.android.com/training/data-storage/room/accessing-data#query-observable
|
||||
* roomDatabase.insert(updated)
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @param context The CoroutineContext to run the given block in. Defaults to
|
||||
* [EmptyCoroutineContext] combined with
|
||||
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
|
||||
* @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
|
||||
* ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
|
||||
* @param block The block to run when the [LiveData] has active observers.
|
||||
*/
|
||||
@JvmOverloads
|
||||
public fun <T> liveData(
|
||||
context: CoroutineContext = EmptyCoroutineContext,
|
||||
timeoutInMs: Long = DEFAULT_TIMEOUT,
|
||||
block: suspend LiveDataScope<T>.() -> Unit
|
||||
): LiveData<T> = CoroutineLiveData(context, timeoutInMs, block)
|
||||
|
||||
/**
|
||||
* Builds a LiveData that has values yielded from the given [block] that executes on a
|
||||
* [LiveDataScope].
|
||||
*
|
||||
* The [block] starts executing when the returned [LiveData] becomes [active](LiveData.onActive).
|
||||
* If the [LiveData] becomes [inactive](LiveData.onInactive) while the [block] is executing, it
|
||||
* will be cancelled after the [timeout] duration unless the [LiveData] becomes active again
|
||||
* before that timeout (to gracefully handle cases like Activity rotation). Any value
|
||||
* [LiveDataScope.emit]ed from a cancelled [block] will be ignored.
|
||||
*
|
||||
* After a cancellation, if the [LiveData] becomes active again, the [block] will be re-executed
|
||||
* from the beginning. If you would like to continue the operations based on where it was stopped
|
||||
* last, you can use the [LiveDataScope.latestValue] function to get the last
|
||||
* [LiveDataScope.emit]ed value.
|
||||
|
||||
* If the [block] completes successfully *or* is cancelled due to reasons other than [LiveData]
|
||||
* becoming inactive, it *will not* be re-executed even after [LiveData] goes through active
|
||||
* inactive cycle.
|
||||
*
|
||||
* As a best practice, it is important for the [block] to cooperate in cancellation. See kotlin
|
||||
* coroutines documentation for details
|
||||
* https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html.
|
||||
*
|
||||
* The [timeout] can be changed to fit different use cases better, for example increasing it
|
||||
* will give more time to the [block] to complete even if [LiveData] is inactive. It is good for
|
||||
* cases when [block] is finite (meaning it can complete successfully) and is costly to restart.
|
||||
* Otherwise if a [block] is cheap to restart, decreasing the [timeout] value will allow to
|
||||
* yield less values that aren't consumed by anything.
|
||||
*
|
||||
* ```
|
||||
* // a simple LiveData that receives value 3, 3 seconds after being observed for the first time.
|
||||
* val data : LiveData<Int> = liveData {
|
||||
* delay(3000)
|
||||
* emit(3)
|
||||
* }
|
||||
*
|
||||
*
|
||||
* // a LiveData that fetches a `User` object based on a `userId` and refreshes it every 30 seconds
|
||||
* // as long as it is observed
|
||||
* val userId : LiveData<String> = ...
|
||||
* val user = userId.switchMap { id ->
|
||||
* liveData {
|
||||
* while(true) {
|
||||
* // note that `while(true)` is fine because the `delay(30_000)` below will cooperate in
|
||||
* // cancellation if LiveData is not actively observed anymore
|
||||
* val data = api.fetch(id) // errors are ignored for brevity
|
||||
* emit(data)
|
||||
* delay(30_000)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // A retrying data fetcher with doubling back-off
|
||||
* val user = liveData {
|
||||
* var backOffTime = 1_000
|
||||
* var succeeded = false
|
||||
* while(!succeeded) {
|
||||
* try {
|
||||
* emit(api.fetch(id))
|
||||
* succeeded = true
|
||||
* } catch(ioError : IOException) {
|
||||
* delay(backOffTime)
|
||||
* backOffTime *= minOf(backOffTime * 2, 60_000)
|
||||
* }
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // a LiveData that tries to load the `User` from local cache first and then tries to fetch
|
||||
* // from the server and also yields the updated value
|
||||
* val user = liveData {
|
||||
* // dispatch loading first
|
||||
* emit(LOADING(id))
|
||||
* // check local storage
|
||||
* val cached = cache.loadUser(id)
|
||||
* if (cached != null) {
|
||||
* emit(cached)
|
||||
* }
|
||||
* if (cached == null || cached.isStale()) {
|
||||
* val fresh = api.fetch(id) // errors are ignored for brevity
|
||||
* cache.save(fresh)
|
||||
* emit(fresh)
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* // a LiveData that immediately receives a LiveData<User> from the database and yields it as a
|
||||
* // source but also tries to back-fill the database from the server
|
||||
* val user = liveData {
|
||||
* val fromDb: LiveData<User> = roomDatabase.loadUser(id)
|
||||
* emitSource(fromDb)
|
||||
* val updated = api.fetch(id) // errors are ignored for brevity
|
||||
* // Since we are using Room here, updating the database will update the `fromDb` LiveData
|
||||
* // that was obtained above. See Room's documentation for more details.
|
||||
* // https://developer.android.com/training/data-storage/room/accessing-data#query-observable
|
||||
* roomDatabase.insert(updated)
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* @param context The CoroutineContext to run the given block in. Defaults to
|
||||
* [EmptyCoroutineContext] combined with
|
||||
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate].
|
||||
* @param timeout The timeout duration before cancelling the block if there are no active observers
|
||||
* ([LiveData.hasActiveObservers].
|
||||
* @param block The block to run when the [LiveData] has active observers.
|
||||
*/
|
||||
@RequiresApi(Build.VERSION_CODES.O)
|
||||
@JvmOverloads
|
||||
public fun <T> liveData(
|
||||
timeout: Duration,
|
||||
context: CoroutineContext = EmptyCoroutineContext,
|
||||
block: suspend LiveDataScope<T>.() -> Unit
|
||||
): LiveData<T> = CoroutineLiveData(context, Api26Impl.toMillis(timeout), block)
|
||||
|
||||
@RequiresApi(26)
|
||||
internal object Api26Impl {
|
||||
fun toMillis(timeout: Duration): Long {
|
||||
return timeout.toMillis()
|
||||
}
|
||||
}
|
@ -0,0 +1,161 @@
|
||||
/*
|
||||
* Copyright 2019 The Android Open Source Project
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
@file:JvmName("FlowLiveDataConversions")
|
||||
|
||||
package androidx.lifecycle
|
||||
|
||||
import android.annotation.SuppressLint
|
||||
import android.os.Build
|
||||
import androidx.annotation.RequiresApi
|
||||
import androidx.arch.core.executor.ArchTaskExecutor
|
||||
import java.time.Duration
|
||||
import kotlin.coroutines.CoroutineContext
|
||||
import kotlin.coroutines.EmptyCoroutineContext
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.channels.awaitClose
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.callbackFlow
|
||||
import kotlinx.coroutines.flow.conflate
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
|
||||
/**
|
||||
* Creates a LiveData that has values collected from the origin [Flow].
|
||||
*
|
||||
* If the origin [Flow] is a [StateFlow], then the initial value will be populated
|
||||
* to the [LiveData]'s value field on the main thread.
|
||||
*
|
||||
* The upstream flow collection starts when the returned [LiveData] becomes active
|
||||
* ([LiveData.onActive]).
|
||||
* If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
|
||||
* the flow collection will be cancelled after [timeoutInMs] milliseconds unless the [LiveData]
|
||||
* becomes active again before that timeout (to gracefully handle cases like Activity rotation).
|
||||
*
|
||||
* After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
|
||||
* be re-executed.
|
||||
*
|
||||
* If the upstream flow completes successfully *or* is cancelled due to reasons other than
|
||||
* [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
|
||||
* active inactive cycle.
|
||||
*
|
||||
* If flow completes with an exception, then exception will be delivered to the
|
||||
* [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
|
||||
* By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
|
||||
* thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
|
||||
* expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
|
||||
* to emit a helpful error object.
|
||||
*
|
||||
* The [timeoutInMs] can be changed to fit different use cases better, for example increasing it
|
||||
* will give more time to flow to complete before being canceled and is good for finite flows
|
||||
* that are costly to restart. Otherwise if a flow is cheap to restart decreasing the [timeoutInMs]
|
||||
* value will allow to produce less values that aren't consumed by anything.
|
||||
*
|
||||
* @param context The CoroutineContext to collect the upstream flow in. Defaults to
|
||||
* [EmptyCoroutineContext] combined with
|
||||
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
|
||||
* @param timeoutInMs The timeout in ms before cancelling the block if there are no active observers
|
||||
* ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
|
||||
*/
|
||||
@JvmOverloads
|
||||
public fun <T> Flow<T>.asLiveData(
|
||||
context: CoroutineContext = EmptyCoroutineContext,
|
||||
timeoutInMs: Long = DEFAULT_TIMEOUT
|
||||
): LiveData<T> = liveData(context, timeoutInMs) {
|
||||
collect {
|
||||
emit(it)
|
||||
}
|
||||
}.also { liveData ->
|
||||
val flow = this
|
||||
if (flow is StateFlow<T>) {
|
||||
@SuppressLint("RestrictedApi")
|
||||
if (ArchTaskExecutor.getInstance().isMainThread) {
|
||||
liveData.value = flow.value
|
||||
} else {
|
||||
liveData.postValue(flow.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a [Flow] containing values dispatched by originating [LiveData]: at the start
|
||||
* a flow collector receives the latest value held by LiveData and then observes LiveData updates.
|
||||
*
|
||||
* When a collection of the returned flow starts the originating [LiveData] becomes
|
||||
* [active][LiveData.onActive]. Similarly, when a collection completes [LiveData] becomes
|
||||
* [inactive][LiveData.onInactive].
|
||||
*
|
||||
* BackPressure: the returned flow is conflated. There is no mechanism to suspend an emission by
|
||||
* LiveData due to a slow collector, so collector always gets the most recent value emitted.
|
||||
*/
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
public fun <T> LiveData<T>.asFlow(): Flow<T> = callbackFlow {
|
||||
val observer = Observer<T> {
|
||||
trySend(it)
|
||||
}
|
||||
withContext(Dispatchers.Main.immediate) {
|
||||
observeForever(observer)
|
||||
}
|
||||
|
||||
awaitClose {
|
||||
GlobalScope.launch(Dispatchers.Main.immediate) {
|
||||
removeObserver(observer)
|
||||
}
|
||||
}
|
||||
}.conflate()
|
||||
|
||||
/**
|
||||
* Creates a LiveData that has values collected from the origin [Flow].
|
||||
*
|
||||
* The upstream flow collection starts when the returned [LiveData] becomes active
|
||||
* ([LiveData.onActive]).
|
||||
* If the [LiveData] becomes inactive ([LiveData.onInactive]) while the flow has not completed,
|
||||
* the flow collection will be cancelled after [timeout] unless the [LiveData]
|
||||
* becomes active again before that timeout (to gracefully handle cases like Activity rotation).
|
||||
*
|
||||
* After a cancellation, if the [LiveData] becomes active again, the upstream flow collection will
|
||||
* be re-executed.
|
||||
*
|
||||
* If the upstream flow completes successfully *or* is cancelled due to reasons other than
|
||||
* [LiveData] becoming inactive, it *will not* be re-collected even after [LiveData] goes through
|
||||
* active inactive cycle.
|
||||
*
|
||||
* If flow completes with an exception, then exception will be delivered to the
|
||||
* [CoroutineExceptionHandler][kotlinx.coroutines.CoroutineExceptionHandler] of provided [context].
|
||||
* By default [EmptyCoroutineContext] is used to so an exception will be delivered to main's
|
||||
* thread [UncaughtExceptionHandler][Thread.UncaughtExceptionHandler]. If your flow upstream is
|
||||
* expected to throw, you can use [catch operator][kotlinx.coroutines.flow.catch] on upstream flow
|
||||
* to emit a helpful error object.
|
||||
*
|
||||
* The [timeout] can be changed to fit different use cases better, for example increasing it
|
||||
* will give more time to flow to complete before being canceled and is good for finite flows
|
||||
* that are costly to restart. Otherwise if a flow is cheap to restart decreasing the [timeout]
|
||||
* value will allow to produce less values that aren't consumed by anything.
|
||||
*
|
||||
* @param context The CoroutineContext to collect the upstream flow in. Defaults to
|
||||
* [EmptyCoroutineContext] combined with
|
||||
* [Dispatchers.Main.immediate][kotlinx.coroutines.MainCoroutineDispatcher.immediate]
|
||||
* @param timeout The timeout in ms before cancelling the block if there are no active observers
|
||||
* ([LiveData.hasActiveObservers]. Defaults to [DEFAULT_TIMEOUT].
|
||||
*/
|
||||
@RequiresApi(Build.VERSION_CODES.O)
|
||||
public fun <T> Flow<T>.asLiveData(
|
||||
timeout: Duration,
|
||||
context: CoroutineContext = EmptyCoroutineContext
|
||||
): LiveData<T> = asLiveData(context, Api26Impl.toMillis(timeout))
|
Loading…
Reference in new issue