Kotlin Flow Interview Questions 2025
15 Kotlin Flow questions with complete, production-level answers and code examples. Covers cold vs hot flows, StateFlow vs SharedFlow, operators, testing with Turbine, and everything else that comes up in Android Staff engineer interviews.
What is the difference between cold and hot flows?
A cold flow does not produce values until a collector subscribes to it. Each collector gets its own independent execution — if two collectors subscribe to the same cold flow, the flow's producer block runs twice. The standard flow { } builder creates cold flows. A hot flow is active regardless of whether any collector is subscribed. Values can be emitted and lost if no collector is listening. StateFlow and SharedFlow are hot flows. A database query wrapped in flow { } is cold; a WebSocket connection wrapped in callbackFlow converted to shareIn is hot.
// Cold flow — producer block runs for each collector independently
val coldFlow: Flow<Int> = flow {
println("Producer starting") // Printed once per collector
emit(1)
emit(2)
emit(3)
}
// Two collectors → producer block runs twice
coldFlow.collect { println("Collector A: $it") }
coldFlow.collect { println("Collector B: $it") }
// Hot flow — always active, values shared across collectors
val _state = MutableStateFlow(0)
val hotFlow: StateFlow<Int> = _state.asStateFlow()
// Both collectors share the same value — no duplication of logic
hotFlow.collect { println("Collector A: $it") }
hotFlow.collect { println("Collector B: $it") }StateFlow vs SharedFlow — when do you use each?
StateFlow is a hot flow that always has a current value (initial value required), replays only the latest value to new subscribers, and supports equality-based de-duplication (emissions equal to the current value are dropped). Use it for UI state — a single source of truth for what the screen currently shows. SharedFlow is more configurable: you choose the replay cache size (0 = no replay, N = last N values), the buffer capacity, and the overflow strategy. Use it for one-shot events (replay=0) like navigation commands or snackbar messages that should not be re-triggered on re-subscription.
// StateFlow — for UI state
class SearchViewModel : ViewModel() {
private val _uiState = MutableStateFlow(SearchUiState())
val uiState: StateFlow<SearchUiState> = _uiState.asStateFlow()
}
// SharedFlow — for one-shot navigation events
class LoginViewModel : ViewModel() {
// replay=0: new subscribers won't get past events (correct for navigation)
private val _events = MutableSharedFlow<LoginEvent>(replay = 0)
val events: SharedFlow<LoginEvent> = _events.asSharedFlow()
fun onLoginSuccess() {
viewModelScope.launch {
_events.emit(LoginEvent.NavigateToDashboard)
}
}
}What is flatMapLatest and when do you need it?
flatMapLatest cancels the Flow produced by the previous emission when a new value arrives upstream, and starts a new Flow for the new value. It is the Flow equivalent of switchMap in RxJava. Use it whenever you want only the result of the latest upstream value and old results should be discarded — the canonical example is a search box where typing a new character should cancel the in-flight search for the previous query and start a fresh one. Without flatMapLatest, you would get results from all previous queries in an unpredictable order.
// Search with flatMapLatest — cancels previous search on new query
val searchQuery: StateFlow<String> = // comes from UI text field
val searchResults: StateFlow<List<Result>> = searchQuery
.debounce(300) // Wait 300ms after user stops typing
.distinctUntilChanged() // Ignore if same query typed again
.flatMapLatest { query ->
if (query.isBlank()) {
flowOf(emptyList()) // Empty query → clear results immediately
} else {
repository.search(query) // Cancels previous search when new query arrives
}
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = emptyList()
)How do you test a Flow in Android?
Use the Turbine library (com.cashapp:turbine) for structured Flow testing. Turbine gives you a test { } extension on any Flow that lets you call awaitItem() to wait for the next emission, awaitComplete() to assert completion, and awaitError() to assert exceptions. For ViewModel testing, use runTest with a TestCoroutineDispatcher and replace SharingStarted.WhileSubscribed with SharingStarted.Eagerly in tests (or use backgroundScope). Always cancel the test scope after assertions to avoid hanging tests.
// build.gradle.kts
// testImplementation("app.cash.turbine:turbine:1.1.0")
@Test
fun `search results update on new query`() = runTest {
val viewModel = SearchViewModel(
repository = FakeSearchRepository(),
dispatcher = UnconfinedTestDispatcher(testScheduler)
)
viewModel.searchQuery.test {
// Initial empty state
assertEquals(emptyList<Result>(), awaitItem())
// Type a query
viewModel.onQueryChanged("kotlin")
val results = awaitItem()
assertThat(results).isNotEmpty()
assertThat(results.first().title).contains("Kotlin")
cancelAndIgnoreRemainingEvents()
}
}
// Testing error states
@Test
fun `network error shows error state`() = runTest {
val repo = FakeRepository(shouldFail = true)
val viewModel = MyViewModel(repo)
viewModel.uiState.test {
assertEquals(UiState.Loading, awaitItem())
val errorState = awaitItem()
assertThat(errorState).isInstanceOf(UiState.Error::class.java)
cancelAndIgnoreRemainingEvents()
}
}What is stateIn() and what does WhileSubscribed(5000) do?
stateIn() converts a cold upstream Flow into a hot StateFlow. It takes three parameters: the coroutine scope that owns the sharing, a SharingStarted policy that controls when the upstream Flow is active, and an initial value. WhileSubscribed(stopTimeoutMillis) keeps the upstream active for stopTimeoutMillis milliseconds after the last subscriber cancels. Setting it to 5000ms (5 seconds) means the upstream database query stays active for 5 seconds after the UI leaves the screen — which prevents the query from being cancelled and restarted during a configuration change (screen rotation takes about 300ms, well under the 5 second buffer).
// Without stateIn: every collector starts its own database query
// val messages: Flow<List<Message>> = repository.getMessages() // cold
// With stateIn: all collectors share one upstream query
val messages: StateFlow<List<Message>> = repository
.getMessages() // cold Room Flow
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(stopTimeoutMillis = 5_000),
initialValue = emptyList()
)
// SharingStarted options:
// - Eagerly: starts immediately, never stops (use in tests or singletons)
// - Lazily: starts on first subscriber, never stops after that
// - WhileSubscribed(N): starts on first subscriber, stops N ms after last unsubscribes
// → Use WhileSubscribed(5000) in ViewModels for production codeWhat is callbackFlow and when do you use it?
callbackFlow is a Flow builder designed for wrapping callback-based APIs — anything that delivers values through listeners, event handlers, or callbacks rather than suspension functions. Unlike the regular flow { } builder, callbackFlow can emit from a different coroutine or thread (regular flow throws if you try to emit from a different context). Use it to bridge OkHttp WebSocket callbacks, Android system callbacks (LocationListener, ContentObserver), or SSE response bodies into a Flow. Always implement awaitClose { } to clean up the callback registration when the collector cancels.
// Wrapping OkHttp WebSocket in callbackFlow
fun observeWebSocket(url: String): Flow<WebSocketMessage> = callbackFlow {
val client = OkHttpClient()
val request = Request.Builder().url(url).build()
val listener = object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
// trySend is non-suspending — safe to call from OkHttp's thread
trySend(WebSocketMessage.Text(text))
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
trySend(WebSocketMessage.Binary(bytes))
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
close(t) // Terminates the Flow with an error
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
close() // Terminates the Flow normally
}
}
val webSocket = client.newWebSocket(request, listener)
// awaitClose is called when the collector cancels
// Use it to clean up — cancel the WebSocket connection
awaitClose {
webSocket.cancel()
}
}How does shareIn() work and what is replay?
shareIn() converts a cold Flow into a SharedFlow, sharing a single upstream subscription among multiple collectors. The replay parameter controls how many past emissions are buffered and re-delivered to new subscribers. replay=0 means no values are cached — new subscribers get only future emissions. replay=1 means the latest emission is cached and immediately delivered to new subscribers (like StateFlow without the type constraint). replay=N means the last N emissions are cached. Unlike stateIn(), shareIn() does not require an initial value, making it better for event streams.
// Shared database query — one Room query, many UI subscribers
val sharedMessages: SharedFlow<List<Message>> = repository
.getMessages()
.shareIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
replay = 1 // New subscribers immediately get the latest list
)
// Event bus pattern with replay=0
val networkEvents: SharedFlow<NetworkEvent> = networkMonitor
.observe()
.shareIn(
scope = applicationScope,
started = SharingStarted.Eagerly,
replay = 0 // Events are fire-and-forget, no caching
)
// replay=1 vs StateFlow:
// StateFlow requires initial value + enforces same-value de-duplication
// shareIn(replay=1) is more flexible but lacks the current value guarantee
// Use StateFlow for UI state, shareIn for general event sharingWhat are the key Flow operators? Walk me through debounce, conflate, buffer, combine, and zip.
These operators control timing, backpressure, and combining of multiple flows. debounce(N) waits N milliseconds after the last emission before letting it through — ideal for search boxes to avoid firing on every keystroke. conflate() drops intermediate values when the collector is slow, keeping only the latest — good for progress updates where only the current percentage matters. buffer(N) creates a channel of capacity N between producer and collector, allowing the producer to run ahead without blocking — good for CPU-bound processing. combine() takes the latest values from multiple flows and emits whenever any of them changes. zip() pairs emissions one-to-one — it waits for both flows and stops when either completes.
// debounce — wait 300ms after last keystroke
searchQuery
.debounce(300)
.collect { query -> search(query) }
// conflate — collector only processes latest, drops intermediates
progressFlow
.conflate()
.collect { progress -> updateProgressBar(progress) }
// buffer — producer and collector run concurrently
dataSource
.buffer(capacity = 64)
.collect { item -> processItem(item) } // processing can lag without blocking producer
// combine — latest from each flow, emits on any change
val uiState = combine(
userFlow,
settingsFlow,
networkStatusFlow
) { user, settings, networkStatus ->
UiState(user = user, settings = settings, isOnline = networkStatus.isConnected)
}
// zip — strict one-to-one pairing
val zipped = flowA.zip(flowB) { a, b -> Pair(a, b) }
// If flowA emits 1, 2, 3 and flowB emits "a", "b":
// zipped emits (1,"a"), (2,"b") — stops because flowB completesHow do you handle errors in Flow?
Flow provides several operators for error handling. catch() catches upstream exceptions and lets you emit a fallback value or re-throw. It does not catch exceptions thrown by the collector — only by the upstream. retry() and retryWhen() allow re-subscribing to the upstream on error. onEach() paired with try/catch in the collector handles downstream errors. For UI state, the recommended pattern is to catch errors in the Repository layer, transform them into sealed class states (Result.Error), and propagate them as normal emissions so the ViewModel and UI can react without exception propagation.
// Pattern 1: catch operator — emit fallback on error
repository.getData()
.catch { e ->
Timber.e(e, "Failed to load data")
emit(emptyList()) // Fallback value
}
.collect { data -> updateUi(data) }
// Pattern 2: Result wrapping — cleanest for UI state
sealed class Result<out T> {
data class Success<T>(val data: T) : Result<T>()
data class Error(val exception: Throwable) : Result<Nothing>()
object Loading : Result<Nothing>()
}
fun getData(): Flow<Result<List<Item>>> = flow {
emit(Result.Loading)
try {
val data = api.getItems()
emit(Result.Success(data))
} catch (e: IOException) {
emit(Result.Error(e))
}
}
// Pattern 3: retry with exponential backoff
repository.getStream()
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(2.0.pow(attempt.toInt()).toLong() * 1000) // 1s, 2s, 4s
true // retry
} else {
false // do not retry, propagate the error
}
}
.collect { /* ... */ }What is the difference between collect and collectLatest?
collect processes every emission sequentially. If the collector is slow (e.g., doing a network call per item), it will process each item fully before moving to the next, potentially building up a backlog if the producer is fast. collectLatest cancels the current collector block and restarts it with the latest value whenever a new emission arrives. This is ideal for search results or UI updates where only the most recent value matters and processing an old value is wasteful. Think of collectLatest as 'always work on the newest item, cancel whatever you were doing before.'
// collect — processes every item, even if new ones arrive
flow.collect { value ->
delay(1000) // Slow processing
updateUi(value) // Will process all values, even stale ones
}
// collectLatest — cancels current block when new value arrives
flow.collectLatest { value ->
delay(1000) // If new value arrives during this delay...
updateUi(value) // ...this line may never execute for old values
}
// Practical example: typing in a search box
searchQuery.collectLatest { query ->
// If user types "k", then "ko", then "kot" within 1 second:
// - "k" processing is cancelled when "ko" arrives
// - "ko" processing is cancelled when "kot" arrives
// - Only "kot" processing completes
val results = repository.search(query) // network call
_results.value = results
}How do you cancel a Flow?
Flows respect structured concurrency — they are cancelled when their collecting coroutine is cancelled. In a ViewModel, launch a coroutine in viewModelScope; it will be cancelled when the ViewModel is cleared. In Compose, use collectAsStateWithLifecycle() which ties collection to the lifecycle of the composable. For manual cancellation, cancel the Job returned by launch { flow.collect { } }. You can also cancel from within the flow by calling currentCoroutineContext().cancel() or by throwing a CancellationException, but this is rarely needed — prefer cancelling the scope.
// Automatic cancellation via viewModelScope
class MyViewModel : ViewModel() {
init {
viewModelScope.launch {
// This collection is automatically cancelled when ViewModel is cleared
repository.getUpdates().collect { update ->
_state.value = update
}
}
}
}
// Manual cancellation via Job
val job = viewModelScope.launch {
someFlow.collect { /* ... */ }
}
// Cancel when needed (e.g., user navigates away before ViewModel clears)
job.cancel()
// Compose — lifecycle-aware collection (preferred over LaunchedEffect + collect)
@Composable
fun MyScreen(viewModel: MyViewModel) {
// Automatically starts/stops collection based on Lifecycle.State.STARTED
val state by viewModel.uiState.collectAsStateWithLifecycle()
}What is distinctUntilChanged() for?
distinctUntilChanged() filters out consecutive duplicate emissions. If the upstream emits the same value twice in a row, the second emission is dropped. This is particularly useful to avoid unnecessary recompositions in the UI when a value is updated to the same object. Note that StateFlow already applies this de-duplication (by reference or structural equality), but you may need distinctUntilChanged() on regular cold flows or when combining multiple flows where the combined value might not change.
// Without distinctUntilChanged — UI redraws even if value is the same
userPreferences
.collect { prefs ->
updateTheme(prefs.theme) // Called every time prefs emits, even if theme unchanged
}
// With distinctUntilChanged — only triggers when theme actually changes
userPreferences
.map { it.theme }
.distinctUntilChanged() // Drop if same theme emitted twice in a row
.collect { theme ->
updateTheme(theme) // Only called when theme actually changes
}
// Custom equality — only distinct if both fields change
userPreferences
.distinctUntilChangedBy { it.theme } // Only compare theme field
.collect { /* ... */ }
// With a custom comparator
userPreferences
.distinctUntilChanged { old, new ->
old.theme == new.theme && old.language == new.language
}
.collect { /* ... */ }What is the difference between flow { } and channelFlow { }?
The regular flow { } builder is sequential and single-threaded — you can only emit from the same coroutine that the block runs in, and only one emit can be in-flight at a time. channelFlow { } uses a Channel under the hood, allowing you to emit from multiple coroutines concurrently via send() or trySend(). Use channelFlow when you need to launch concurrent coroutines inside the producer (e.g., fetching from multiple data sources in parallel and merging results). callbackFlow is built on top of channelFlow and adds awaitClose() for resource cleanup.
// flow { } — sequential, single-coroutine producer
val sequentialFlow = flow {
emit(fetchFromDatabase()) // Sequential
emit(fetchFromNetwork()) // Runs after database fetch completes
}
// channelFlow — concurrent producers
val concurrentFlow = channelFlow {
// Launch two concurrent fetches
launch {
val dbResult = fetchFromDatabase()
send(dbResult) // safe to call from launched coroutine
}
launch {
val networkResult = fetchFromNetwork()
send(networkResult) // concurrent with the database fetch
}
// Both results emitted as they complete, in whichever order finishes first
}
// Real-world use: merge local cache and network in parallel
fun getProducts(): Flow<List<Product>> = channelFlow {
// Immediately send cached data
launch { send(cache.getProducts()) }
// Concurrently fetch fresh data and send when ready
launch {
val fresh = api.getProducts()
cache.save(fresh)
send(fresh)
}
}How do you merge multiple Flows?
There are several ways to merge Flows depending on the semantics you need. merge() from kotlinx.coroutines interleaves emissions from multiple flows as they arrive — use it when you want all emissions from all sources with no synchronization. combine() emits whenever any upstream emits, using the latest value from all others — use it for UI state derived from multiple independent sources. zip() pairs emissions one-to-one and stops when the shortest flow completes. flattenMerge() is used when you have a Flow of Flows and want to flatten them with a given concurrency limit.
import kotlinx.coroutines.flow.merge
// merge — interleave all emissions from multiple flows as they arrive
val allEvents: Flow<Event> = merge(
locationEvents,
networkEvents,
authEvents
)
// Emits whichever event arrives first, from any source
// combine — emit derived state when any source changes
val dashboardState: Flow<DashboardState> = combine(
userFlow, // latest user
metricsFlow, // latest metrics
alertsFlow // latest alerts
) { user, metrics, alerts ->
DashboardState(user = user, metrics = metrics, alerts = alerts)
}
// Re-emits whenever user, metrics, OR alerts changes
// flattenMerge — flatten a Flow of Flows with concurrency limit
val results: Flow<SearchResult> = queries
.map { query -> repository.search(query) } // Flow<Flow<SearchResult>>
.flattenMerge(concurrency = 4) // Run up to 4 searches concurrentlyWhat dispatcher should you use for Flow collection?
By default, a Flow executes in the coroutine context of its collector — if you collect on Dispatchers.Main, the producer runs on Main too. Use flowOn() to change the upstream execution context without affecting the downstream. flowOn() only affects operators upstream of where it is placed. For CPU-intensive work in the producer (parsing, sorting), use flowOn(Dispatchers.Default). For IO in the producer (database, network), use flowOn(Dispatchers.IO). Never collect on Dispatchers.IO in a ViewModel — collect on the default (Main) and put IO work in the repository or use flowOn. The Room database Flow already runs on Dispatchers.IO internally, so you do not need flowOn for Room queries.
// flowOn — run the upstream producer on a different dispatcher
val processedData: Flow<Data> = repository
.getRawData() // IO operations inside (Room/network)
.map { raw -> cpuIntensiveTransform(raw) } // CPU-intensive
.flowOn(Dispatchers.Default) // Run everything above this on Default
// Everything below flowOn() still runs on the collector's dispatcher
// Collect on Main (correct for ViewModels)
viewModelScope.launch { // viewModelScope uses Main dispatcher
processedData.collect { data ->
_uiState.value = data // Main-thread safe
}
}
// Room Flow — already dispatched correctly, no flowOn needed
val messages: Flow<List<Message>> = messageDao.getAll()
// Room uses Dispatchers.IO internally
// What NOT to do:
viewModelScope.launch(Dispatchers.IO) { // Wrong!
uiStateFlow.collect { state ->
_uiState.value = state // Crashes — cannot update StateFlow from IO thread in UI context
}
}Quick Reference Summary
Practice Coroutines interactively
The Kotlin Coroutines chapter in our course covers Flow, channels, structured concurrency, and testing — with exercises and quizzes.