improve the network channels for streaming

This commit is contained in:
dece 2021-12-13 22:29:58 +01:00
parent f8b929b00d
commit a2dd6f4876
4 changed files with 101 additions and 59 deletions

View file

@ -1,5 +1,6 @@
package dev.lowrespalmtree.comet package dev.lowrespalmtree.comet
import android.util.Log
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
@ -18,6 +19,8 @@ class PreTextLine(val text: String) : Line
class BlockquoteLine(val text: String) : Line class BlockquoteLine(val text: String) : Line
class ListItemLine(val text: String) : Line class ListItemLine(val text: String) : Line
const val TAG = "Gemtext"
fun parseData( fun parseData(
inChannel: Channel<ByteArray>, inChannel: Channel<ByteArray>,
charset: Charset, charset: Charset,
@ -27,6 +30,7 @@ fun parseData(
scope.launch { scope.launch {
var isPref = false var isPref = false
var buffer = ByteArray(0) var buffer = ByteArray(0)
Log.d(TAG, "parseData: start getting data from channel")
for (data in inChannel) { for (data in inChannel) {
buffer += data buffer += data
var nextLineFeed: Int = -1 var nextLineFeed: Int = -1
@ -41,6 +45,7 @@ fun parseData(
channel.send(line) channel.send(line)
} }
} }
Log.d(TAG, "parseData: channel closed")
channel.close() channel.close()
} }
return channel return channel

View file

@ -18,10 +18,13 @@ import androidx.lifecycle.viewModelScope
import androidx.recyclerview.widget.LinearLayoutManager import androidx.recyclerview.widget.LinearLayoutManager
import dev.lowrespalmtree.comet.databinding.ActivityMainBinding import dev.lowrespalmtree.comet.databinding.ActivityMainBinding
import kotlinx.coroutines.* import kotlinx.coroutines.*
import kotlinx.coroutines.channels.onSuccess
import java.net.ConnectException import java.net.ConnectException
import java.net.SocketTimeoutException
import java.net.UnknownHostException import java.net.UnknownHostException
import java.nio.charset.Charset import java.nio.charset.Charset
@ExperimentalCoroutinesApi
class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
private lateinit var binding: ActivityMainBinding private lateinit var binding: ActivityMainBinding
private lateinit var pageViewModel: PageViewModel private lateinit var pageViewModel: PageViewModel
@ -92,10 +95,7 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
} }
when (uri.scheme) { when (uri.scheme) {
"gemini" -> { "gemini" -> pageViewModel.sendGeminiRequest(uri)
currentUrl = uri.toString()
pageViewModel.sendGeminiRequest(uri)
}
else -> openUnknownScheme(uri) else -> openUnknownScheme(uri)
} }
} }
@ -125,7 +125,10 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
Log.d(TAG, "handleEvent: $event") Log.d(TAG, "handleEvent: $event")
if (!event.handled) { if (!event.handled) {
when (event) { when (event) {
is PageViewModel.SuccessEvent -> visitedUrls.add(event.uri) is PageViewModel.SuccessEvent -> {
currentUrl = event.uri
visitedUrls.add(event.uri)
}
is PageViewModel.RedirectEvent -> openUrl(event.uri, redirections = event.redirects) is PageViewModel.RedirectEvent -> openUrl(event.uri, redirections = event.redirects)
is PageViewModel.FailureEvent -> alert(event.message) is PageViewModel.FailureEvent -> alert(event.message)
} }
@ -188,7 +191,9 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
signalError( signalError(
when (e) { when (e) {
is UnknownHostException -> "Unknown host \"${uri.authority}\"." is UnknownHostException -> "Unknown host \"${uri.authority}\"."
is ConnectException -> "Can't connect to this server: ${e.message}." is ConnectException -> "Can't connect to this server: ${e.localizedMessage}."
is SocketTimeoutException -> "Connection timed out."
is CancellationException -> "Connection cancelled: ${e.message}."
else -> "Oops, something failed!" else -> "Oops, something failed!"
} }
) )
@ -204,7 +209,10 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
Log.i(TAG, "sendRequest: got ${response.code} with meta \"${response.meta}\"") Log.i(TAG, "sendRequest: got ${response.code} with meta \"${response.meta}\"")
when (response.code.getCategory()) { when (response.code.getCategory()) {
Response.Code.Category.SUCCESS -> handleRequestSuccess(response, uri) Response.Code.Category.SUCCESS -> handleRequestSuccess(response, uri)
Response.Code.Category.REDIRECT -> handleRedirect(response, redirects = redirects + 1) Response.Code.Category.REDIRECT -> handleRedirect(
response,
redirects = redirects + 1
)
Response.Code.Category.SERVER_ERROR -> handleError(response) Response.Code.Category.SERVER_ERROR -> handleError(response)
else -> signalError("Can't handle code ${response.code}.") else -> signalError("Can't handle code ${response.code}.")
} }
@ -224,16 +232,34 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
val charset = Charset.defaultCharset() val charset = Charset.defaultCharset()
var mainTitle: String? = null var mainTitle: String? = null
var lastUpdate = System.currentTimeMillis() var lastUpdate = System.currentTimeMillis()
for (line in parseData(response.data, charset, viewModelScope)) { var lastNumLines = 0
Log.d(TAG, "handleRequestSuccess: start parsing line data")
try {
val lineChannel = parseData(response.data, charset, viewModelScope)
while (!lineChannel.isClosedForReceive) {
val lineChannelResult = withTimeout(100) { lineChannel.tryReceive() }
lineChannelResult.onSuccess { line ->
linesList.add(line) linesList.add(line)
if (mainTitle == null && line is TitleLine && line.level == 1) if (mainTitle == null && line is TitleLine && line.level == 1)
mainTitle = line.text mainTitle = line.text
}
// Throttle the recycler view updates to 100ms and new content only.
if (linesList.size > lastNumLines) {
val time = System.currentTimeMillis() val time = System.currentTimeMillis()
if (time - lastUpdate >= 100) { // Throttle to 100ms the recycler view updates… if (time - lastUpdate >= 100) {
lines.postValue(linesList) lines.postValue(linesList)
lastUpdate = time lastUpdate = time
lastNumLines = linesList.size
} }
} }
}
} catch (e: CancellationException) {
Log.e(TAG, "handleRequestSuccess: coroutine cancelled: ${e.message}")
state.postValue(State.IDLE)
return
}
Log.d(TAG, "handleRequestSuccess: done parsing line data")
lines.postValue(linesList) lines.postValue(linesList)
// We record the history entry here: it's nice because we have the main title available // We record the history entry here: it's nice because we have the main title available
@ -248,7 +274,9 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
} }
private fun handleError(response: Response) { private fun handleError(response: Response) {
event.postValue(FailureEvent(when (response.code) { event.postValue(
FailureEvent(
when (response.code) {
Response.Code.TEMPORARY_FAILURE -> "40: the server encountered a temporary failure." Response.Code.TEMPORARY_FAILURE -> "40: the server encountered a temporary failure."
Response.Code.SERVER_UNAVAILABLE -> "41: the server is currently unavailable." Response.Code.SERVER_UNAVAILABLE -> "41: the server is currently unavailable."
Response.Code.CGI_ERROR -> "42: a CGI script encountered an error." Response.Code.CGI_ERROR -> "42: a CGI script encountered an error."
@ -260,7 +288,9 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen {
Response.Code.PROXY_REQUEST_REFUSED -> "53: the server refused to proxy the request." Response.Code.PROXY_REQUEST_REFUSED -> "53: the server refused to proxy the request."
Response.Code.BAD_REQUEST -> "59: bad request." Response.Code.BAD_REQUEST -> "59: bad request."
else -> "${response.code}: unknown error code." else -> "${response.code}: unknown error code."
})) }
)
)
} }
} }

View file

@ -7,6 +7,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import java.io.BufferedInputStream import java.io.BufferedInputStream
import java.net.InetSocketAddress
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext
@ -20,8 +21,9 @@ class Request(private val uri: Uri) {
Log.d(TAG, "connect") Log.d(TAG, "connect")
val context = SSLContext.getInstance("TLSv1.2") val context = SSLContext.getInstance("TLSv1.2")
context.init(null, arrayOf(TrustManager()), null) context.init(null, arrayOf(TrustManager()), null)
val socket = context.socketFactory.createSocket(uri.host, port) as SSLSocket val socket = context.socketFactory.createSocket() as SSLSocket
socket.soTimeout = 10000 socket.soTimeout = 10000
socket.connect(InetSocketAddress(uri.host, port), 10000)
socket.startHandshake() socket.startHandshake()
return socket return socket
} }
@ -39,13 +41,11 @@ class Request(private val uri: Uri) {
try { try {
@Suppress("BlockingMethodInNonBlockingContext") // what u gonna do @Suppress("BlockingMethodInNonBlockingContext") // what u gonna do
while ((bis.read(buffer).also { numRead = it }) >= 0) { while ((bis.read(buffer).also { numRead = it }) >= 0) {
Log.d(TAG, "proceed coroutine: received $numRead bytes")
val received = buffer.sliceArray(0 until numRead) val received = buffer.sliceArray(0 until numRead)
channel.send(received) channel.send(received)
} }
} catch (e: SocketTimeoutException) { } catch (e: SocketTimeoutException) {
Log.i(TAG, "Socket timeout.") Log.i(TAG, "proceed coroutine: socket timeout.")
channel.cancel()
} }
} }
} }
@ -58,7 +58,8 @@ class Request(private val uri: Uri) {
@SuppressLint("CustomX509TrustManager") @SuppressLint("CustomX509TrustManager")
class TrustManager : X509TrustManager { class TrustManager : X509TrustManager {
@SuppressLint("TrustAllX509TrustManager") @SuppressLint("TrustAllX509TrustManager")
override fun checkClientTrusted(chain: Array<out X509Certificate>?, authType: String?) {} override fun checkClientTrusted(chain: Array<out X509Certificate>?, authType: String?) {
}
override fun checkServerTrusted(chain: Array<out X509Certificate>?, authType: String?) { override fun checkServerTrusted(chain: Array<out X509Certificate>?, authType: String?) {
Log.d(TAG, "cool cert, please continue") Log.d(TAG, "cool cert, please continue")

View file

@ -3,7 +3,7 @@ package dev.lowrespalmtree.comet
import android.util.Log import android.util.Log
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.nio.charset.Charset import java.nio.charset.Charset
@ -56,40 +56,46 @@ class Response(val code: Code, val meta: String, val data: Channel<ByteArray>) {
suspend fun from(channel: Channel<ByteArray>, scope: CoroutineScope): Response? { suspend fun from(channel: Channel<ByteArray>, scope: CoroutineScope): Response? {
var received = 0 var received = 0
val headerBuffer = ByteBuffer.allocate(1024) val headerBuffer = ByteBuffer.allocate(1024)
for (data in channel) { var lfIndex: Int
// While we don't have a response object (i.e. no header parsed), keep reading.
while (true) {
val data = try {
channel.receive()
} catch (e: ClosedReceiveChannelException) {
Log.d(TAG, "companion from: channel closed during initial receive")
return null
}
// Push some data into our buffer. // Push some data into our buffer.
headerBuffer.put(data) headerBuffer.put(data)
received += data.size received += data.size
// Check if there is enough data to parse a Gemini header from it (e.g. has \r\n). // Check if there is enough data to parse a Gemini header from it (e.g. has \r\n).
val lfIndex = headerBuffer.array().indexOf(0x0D) // \r lfIndex = headerBuffer.array().indexOf(0x0D) // \r
if (lfIndex == -1) if (lfIndex == -1)
continue continue
if (headerBuffer.array()[lfIndex + 1] != (0x0A.toByte())) // \n if (headerBuffer.array()[lfIndex + 1] != (0x0A.toByte())) // \n
continue continue
break
}
// We have our header! Parse it to create our Response object. // We have our header! Parse it to create our Response object.
val bytes = headerBuffer.array() val bytes = headerBuffer.array()
val headerData = bytes.sliceArray(0 until lfIndex) val headerData = bytes.sliceArray(0 until lfIndex)
val (code, meta) = parseHeader(headerData) val (code, meta) = parseHeader(headerData)
?: return null.also { Log.e(TAG, "Failed to parse header") } ?: return null.also { Log.e(TAG, "companion from: can't parse header") }
val responseChannel = Channel<ByteArray>() val response = Response(code, meta, Channel())
val response = Response(code, meta, responseChannel)
scope.launch { scope.launch {
// If we got too much data from the channel: push the trailing data first. // If we got too much data from the channel: push the trailing data first.
val trailingIndex = lfIndex + 2 val trailingIndex = lfIndex + 2
if (trailingIndex < received) { if (trailingIndex < received) {
val trailingData = bytes.sliceArray(trailingIndex until received) val trailingData = bytes.sliceArray(trailingIndex until received)
responseChannel.send(trailingData) response.data.send(trailingData)
} }
// Forward all incoming data to the Response channel. // Forward all incoming data to the Response channel.
channel.consumeEach { responseChannel.send(it) } for (data in channel)
responseChannel.close() response.data.send(data)
response.data.close()
} }
// Return the response here; this stops consuming the channel from this for-loop so
// that the coroutine above can take care of it.
return response return response
} }
return null
}
/** Return the code and meta from this header if it could be parsed correctly. */ /** Return the code and meta from this header if it could be parsed correctly. */
private fun parseHeader(data: ByteArray): Pair<Code, String>? { private fun parseHeader(data: ByteArray): Pair<Code, String>? {