Last active
March 21, 2024 15:24
-
-
Save Rattlyy/01de975eb12af3c4878f87f1bb39dcec to your computer and use it in GitHub Desktop.
Utility to use SSE with Kotlin flows on Quarkus
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import jakarta.ws.rs.sse.Sse | |
import jakarta.ws.rs.sse.SseEventSink | |
import kotlinx.coroutines.cancel | |
import kotlinx.coroutines.coroutineScope | |
import kotlinx.coroutines.flow.MutableSharedFlow | |
import kotlinx.coroutines.flow.collectLatest | |
private data class Event(val name: String, val data: String) | |
object SseBus { | |
private val flows: MutableMap<String, MutableSharedFlow<Event>> = mutableMapOf() | |
suspend fun publish(id: String, eventName: String = "message", data: String = ".") { | |
if (flows[id] == null) { | |
flows[id] = MutableSharedFlow() | |
} | |
flows[id]!!.emit(Event(eventName, data)) | |
} | |
suspend fun flow(sink: SseEventSink, sse: Sse, id: String) { | |
if (flows[id] == null) { | |
flows[id] = MutableSharedFlow() | |
} | |
coroutineScope { | |
flows[id]!!.collectLatest { | |
if (sink.isClosed) { | |
this.cancel() | |
return@collectLatest | |
} | |
sink.send(sse.newEvent(it.name, it.data)) | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment