[FIXED] Run a kotlin flow once but have it received twice downstream

Issue

Scenario

I have a hot flow EventHandler.sharedFlow emitted on a button click.

The flow is received by Repository that performs some action in OnEach{}.

The repository flow is then received by two event collectors EventCollectorA and EventCollectorB.

The event collector flows are then combined and collected in MyViewModel.

Issue

The two event collectors cause onEach{...} to run twice on every click. However I only want to run onEach{...} once and have it received in two event collectors. How can I achieve this?

Note: I am using Hilt to ony have one instance of Repository, EventCollectorA and EventCollectorB

Flow Diagram

Code

@AndroidEntryPoint
class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        val binding = ActivityMainBinding.inflate(layoutInflater)
        setContentView(binding.root)

        val viewModel = ViewModelProvider(this).get(MyViewModel::class.java)
        binding.buttonB.setOnClickListener {
            viewModel.userClickEvent("Click Event")
        }
    }
}
@HiltViewModel
class MyViewModel @Inject constructor(
    private val eventHandler: EventHandler,
    private val eventCollectorA: EventCollectorA,
    private val eventCollectorB: EventCollectorB,
) : ViewModel() {
    fun userClickEvent(event: String) = viewModelScope.launch {
        eventHandler.userClick(event)
    }

    init {
        viewModelScope.launch {
            combine(
                eventCollectorA.sharedFlow,
                eventCollectorB.sharedFlow
            ) { a, b ->
                {/*do something*/}
            }.collect()
        }
    }
}
class EventHandler  {
    private val _sharedFlow = MutableSharedFlow<String>()
    val sharedFlow = _sharedFlow.asSharedFlow()

    suspend fun userClick(event: String) {
        _sharedFlow.emit(event)
    }
}
class Repository constructor(
    eventHandler: EventHandler,
) {
    val sharedFlow = eventHandler.sharedFlow
            .filter { it == "Click Event" }
            .onEach {/*do something*/} /*onEach is called twice on click event. I only want it called once*/ 
            .onStart { emit("Begin") }
}
class EventCollectorA constructor(repository: Repository) {
    val sharedFlow = repository.sharedFlow.map {
        it
    }
}

class EventCollectorB constructor(repository: Repository) {
    val sharedFlow = repository.sharedFlow.map {
        it
    }
}

Solution

The problem here is that while eventHandler.sharedFlow is a SharedFlow, after applying any operators to it, we get a regular, not shared flow. filter(), onEach() and onStart() are running separately for each new collection. If you want to share them between collections, you need to construct another shared flow, after applying them:

    val sharedFlow = eventHandler.sharedFlow
            .filter { it == "Click Event" }
            .onEach {/*do something*/}
            .onStart { emit("Begin") }
            .shareIn(...)   

Further explanation

We need to be aware that a regular, cold flow is not like a live stream of data. It is more like a source of such streams and with each new collection we start entirely new stream of data. For example, if we create a flow using flow { } builder, we have only a single flow object, but if we invoke collect {} multiple times on it, then for each collection the lambda will be invoked again and again. Similarly, each operator that we use to construct a new flow, is also invoked separately for each collection.

You can think of shareIn() as creating a service that observes its upstream flow and duplicates its data to each of its downstream flows. No matter how many times we collect the shared flow, upstream flow will be collected only once. Operators above shareIn() will be invoked once, while operators below shareIn() will be invoked separately for each collection.

Answered By – broot

Answer Checked By – Laura B. (Easybugfix Admin)

Leave a Reply

(*) Required, Your email will not be published