📺 Streaming & SSE

KNET obsługuje Server-Sent Events (SSE), NDJSON streaming i chunked responses.

Server-Sent Events (SSE)

import rip.nerd.kitsunenet.streaming.KNETBufferedStreamingClient

// Połącz z SSE endpoint
KNETBufferedStreamingClient.sse("https://api.example.com/events")
    .collect { event ->
        println("Event: ${event.event}")
        println("Data: ${event.data}")
        println("ID: ${event.id}")
    }

SSE Event struktura

data class SSEEvent(
    val event: String?,   // Typ eventu (opcjonalny)
    val data: String,     // Dane
    val id: String?,      // ID eventu (opcjonalny)
    val retry: Long?      // Retry interval (opcjonalny)
)

Z autoryzacją

KNETBufferedStreamingClient.sse(
    url = "https://api.example.com/events",
    headers = mapOf("Authorization" to "Bearer $token")
).collect { event ->
    handleEvent(event)
}

Filtrowanie eventów

KNETBufferedStreamingClient.sse(url)
    .filter { it.event == "message" }
    .map { parseMessage(it.data) }
    .collect { message ->
        displayMessage(message)
    }

NDJSON (Newline Delimited JSON)

// Streaming JSON objects
KNETBufferedStreamingClient.ndjson(
    "https://api.example.com/stream"
).collect { jsonObject ->
    // Każda linia to osobny JSON object
    val id = jsonObject["id"]
    val data = jsonObject["data"]
    processData(id, data)
}

// Z typowaniem
data class StreamItem(val id: Int, val value: String)

KNETBufferedStreamingClient.ndjson<StreamItem>(url)
    .collect { item ->
        println("${item.id}: ${item.value}")
    }

Chunked Response

// Dla dużych odpowiedzi przychodzących w chunks
KNETBufferedStreamingClient.chunked(url)
    .collect { chunk ->
        // chunk to ByteArray
        processChunk(chunk)
    }

// Lub jako String chunks
KNETBufferedStreamingClient.chunkedText(url)
    .collect { textChunk ->
        appendToDisplay(textChunk)
    }

Long Polling

// Automatyczne long polling
val poller = KNETLongPolling(
    url = "https://api.example.com/poll",
    client = client,
    pollInterval = 0,  // Natychmiast po odpowiedzi
    timeout = 30_000   // 30s timeout per request
)

poller.start { response ->
    val updates = response.jsonList<Update>()
    processUpdates(updates)
}

// Zatrzymaj
poller.stop()

// Z timeout progression
val poller = KNETLongPolling.builder(url)
    .initialTimeout(5_000)
    .maxTimeout(30_000)
    .timeoutProgression { current -> current * 2 }
    .build()

Przykład: Real-time updates

class RealTimeUpdatesRepository {

    private var sseJob: Job? = null
    private val _updates = MutableSharedFlow<Update>()
    val updates: SharedFlow<Update> = _updates

    fun startListening(token: String) {
        sseJob = scope.launch {
            KNETBufferedStreamingClient.sse(
                url = "https://api.example.com/updates/stream",
                headers = mapOf("Authorization" to "Bearer $token")
            )
            .retry(3) { cause ->
                delay(5000)
                cause is IOException
            }
            .catch { e ->
                Log.e("SSE", "Error: ${e.message}")
                emit(SSEEvent(event = "error", data = e.message ?: ""))
            }
            .collect { event ->
                when (event.event) {
                    "update" -> {
                        val update = parseUpdate(event.data)
                        _updates.emit(update)
                    }
                    "heartbeat" -> {
                        // Ignoruj heartbeat
                    }
                    "error" -> {
                        handleError(event.data)
                    }
                }
            }
        }
    }

    fun stopListening() {
        sseJob?.cancel()
        sseJob = null
    }
}

Przykład: Chat z SSE

class SSEChatViewModel : ViewModel() {

    private val _messages = MutableStateFlow<List<Message>>(emptyList())
    val messages: StateFlow<List<Message>> = _messages

    init {
        viewModelScope.launch {
            KNETBufferedStreamingClient.sse("$baseUrl/chat/stream")
                .filter { it.event == "message" }
                .map { event ->
                    Json.decodeFromString<Message>(event.data)
                }
                .collect { message ->
                    _messages.update { it + message }
                }
        }
    }

    fun sendMessage(text: String) {
        viewModelScope.launch {
            client.post("$baseUrl/chat/messages", mapOf("text" to text))
        }
    }
}

Przykład: Stock ticker

data class StockQuote(
    val symbol: String,
    val price: Double,
    val change: Double,
    val timestamp: Long
)

class StockTickerViewModel : ViewModel() {

    private val _quotes = MutableStateFlow<Map<String, StockQuote>>(emptyMap())
    val quotes: StateFlow<Map<String, StockQuote>> = _quotes

    fun subscribeToSymbols(symbols: List<String>) {
        val symbolsParam = symbols.joinToString(",")

        viewModelScope.launch {
            KNETBufferedStreamingClient.ndjson<StockQuote>(
                "$baseUrl/stocks/stream?symbols=$symbolsParam"
            ).collect { quote ->
                _quotes.update { current ->
                    current + (quote.symbol to quote)
                }
            }
        }
    }
}

📚 Zobacz też