KNET obsługuje Server-Sent Events (SSE), NDJSON streaming i chunked responses.
import rip.nerd.kitsunenet.streaming.KNETBufferedStreamingClient
// Połącz z SSE endpoint
KNETBufferedStreamingClient.sse("https://api.example.com/events")
.collect { event ->
println("Event: ${event.event}")
println("Data: ${event.data}")
println("ID: ${event.id}")
}
data class SSEEvent(
val event: String?, // Typ eventu (opcjonalny)
val data: String, // Dane
val id: String?, // ID eventu (opcjonalny)
val retry: Long? // Retry interval (opcjonalny)
)
KNETBufferedStreamingClient.sse(
url = "https://api.example.com/events",
headers = mapOf("Authorization" to "Bearer $token")
).collect { event ->
handleEvent(event)
}
KNETBufferedStreamingClient.sse(url)
.filter { it.event == "message" }
.map { parseMessage(it.data) }
.collect { message ->
displayMessage(message)
}
// Streaming JSON objects
KNETBufferedStreamingClient.ndjson(
"https://api.example.com/stream"
).collect { jsonObject ->
// Każda linia to osobny JSON object
val id = jsonObject["id"]
val data = jsonObject["data"]
processData(id, data)
}
// Z typowaniem
data class StreamItem(val id: Int, val value: String)
KNETBufferedStreamingClient.ndjson<StreamItem>(url)
.collect { item ->
println("${item.id}: ${item.value}")
}
// Dla dużych odpowiedzi przychodzących w chunks
KNETBufferedStreamingClient.chunked(url)
.collect { chunk ->
// chunk to ByteArray
processChunk(chunk)
}
// Lub jako String chunks
KNETBufferedStreamingClient.chunkedText(url)
.collect { textChunk ->
appendToDisplay(textChunk)
}
// Automatyczne long polling
val poller = KNETLongPolling(
url = "https://api.example.com/poll",
client = client,
pollInterval = 0, // Natychmiast po odpowiedzi
timeout = 30_000 // 30s timeout per request
)
poller.start { response ->
val updates = response.jsonList<Update>()
processUpdates(updates)
}
// Zatrzymaj
poller.stop()
// Z timeout progression
val poller = KNETLongPolling.builder(url)
.initialTimeout(5_000)
.maxTimeout(30_000)
.timeoutProgression { current -> current * 2 }
.build()
class RealTimeUpdatesRepository {
private var sseJob: Job? = null
private val _updates = MutableSharedFlow<Update>()
val updates: SharedFlow<Update> = _updates
fun startListening(token: String) {
sseJob = scope.launch {
KNETBufferedStreamingClient.sse(
url = "https://api.example.com/updates/stream",
headers = mapOf("Authorization" to "Bearer $token")
)
.retry(3) { cause ->
delay(5000)
cause is IOException
}
.catch { e ->
Log.e("SSE", "Error: ${e.message}")
emit(SSEEvent(event = "error", data = e.message ?: ""))
}
.collect { event ->
when (event.event) {
"update" -> {
val update = parseUpdate(event.data)
_updates.emit(update)
}
"heartbeat" -> {
// Ignoruj heartbeat
}
"error" -> {
handleError(event.data)
}
}
}
}
}
fun stopListening() {
sseJob?.cancel()
sseJob = null
}
}
class SSEChatViewModel : ViewModel() {
private val _messages = MutableStateFlow<List<Message>>(emptyList())
val messages: StateFlow<List<Message>> = _messages
init {
viewModelScope.launch {
KNETBufferedStreamingClient.sse("$baseUrl/chat/stream")
.filter { it.event == "message" }
.map { event ->
Json.decodeFromString<Message>(event.data)
}
.collect { message ->
_messages.update { it + message }
}
}
}
fun sendMessage(text: String) {
viewModelScope.launch {
client.post("$baseUrl/chat/messages", mapOf("text" to text))
}
}
}
data class StockQuote(
val symbol: String,
val price: Double,
val change: Double,
val timestamp: Long
)
class StockTickerViewModel : ViewModel() {
private val _quotes = MutableStateFlow<Map<String, StockQuote>>(emptyMap())
val quotes: StateFlow<Map<String, StockQuote>> = _quotes
fun subscribeToSymbols(symbols: List<String>) {
val symbolsParam = symbols.joinToString(",")
viewModelScope.launch {
KNETBufferedStreamingClient.ndjson<StockQuote>(
"$baseUrl/stocks/stream?symbols=$symbolsParam"
).collect { quote ->
_quotes.update { current ->
current + (quote.symbol to quote)
}
}
}
}
}