From 21771bc45c06d316dcf15667811b6bb067eeb382 Mon Sep 17 00:00:00 2001 From: dece Date: Mon, 13 Dec 2021 22:29:58 +0100 Subject: [PATCH] improve the network channels for streaming --- .../java/dev/lowrespalmtree/comet/Gemtext.kt | 5 ++ .../dev/lowrespalmtree/comet/MainActivity.kt | 86 +++++++++++++------ .../java/dev/lowrespalmtree/comet/Request.kt | 13 +-- .../java/dev/lowrespalmtree/comet/Response.kt | 56 ++++++------ 4 files changed, 101 insertions(+), 59 deletions(-) diff --git a/app/src/main/java/dev/lowrespalmtree/comet/Gemtext.kt b/app/src/main/java/dev/lowrespalmtree/comet/Gemtext.kt index 1ac3ab6..fce267d 100644 --- a/app/src/main/java/dev/lowrespalmtree/comet/Gemtext.kt +++ b/app/src/main/java/dev/lowrespalmtree/comet/Gemtext.kt @@ -1,5 +1,6 @@ package dev.lowrespalmtree.comet +import android.util.Log import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch @@ -18,6 +19,8 @@ class PreTextLine(val text: String) : Line class BlockquoteLine(val text: String) : Line class ListItemLine(val text: String) : Line +const val TAG = "Gemtext" + fun parseData( inChannel: Channel, charset: Charset, @@ -27,6 +30,7 @@ fun parseData( scope.launch { var isPref = false var buffer = ByteArray(0) + Log.d(TAG, "parseData: start getting data from channel") for (data in inChannel) { buffer += data var nextLineFeed: Int = -1 @@ -41,6 +45,7 @@ fun parseData( channel.send(line) } } + Log.d(TAG, "parseData: channel closed") channel.close() } return channel diff --git a/app/src/main/java/dev/lowrespalmtree/comet/MainActivity.kt b/app/src/main/java/dev/lowrespalmtree/comet/MainActivity.kt index 81e933c..b192f37 100644 --- a/app/src/main/java/dev/lowrespalmtree/comet/MainActivity.kt +++ b/app/src/main/java/dev/lowrespalmtree/comet/MainActivity.kt @@ -18,10 +18,13 @@ import androidx.lifecycle.viewModelScope import androidx.recyclerview.widget.LinearLayoutManager import dev.lowrespalmtree.comet.databinding.ActivityMainBinding import kotlinx.coroutines.* +import kotlinx.coroutines.channels.onSuccess import java.net.ConnectException +import java.net.SocketTimeoutException import java.net.UnknownHostException import java.nio.charset.Charset +@ExperimentalCoroutinesApi class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { private lateinit var binding: ActivityMainBinding private lateinit var pageViewModel: PageViewModel @@ -92,10 +95,7 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { } when (uri.scheme) { - "gemini" -> { - currentUrl = uri.toString() - pageViewModel.sendGeminiRequest(uri) - } + "gemini" -> pageViewModel.sendGeminiRequest(uri) else -> openUnknownScheme(uri) } } @@ -125,7 +125,10 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { Log.d(TAG, "handleEvent: $event") if (!event.handled) { when (event) { - is PageViewModel.SuccessEvent -> visitedUrls.add(event.uri) + is PageViewModel.SuccessEvent -> { + currentUrl = event.uri + visitedUrls.add(event.uri) + } is PageViewModel.RedirectEvent -> openUrl(event.uri, redirections = event.redirects) is PageViewModel.FailureEvent -> alert(event.message) } @@ -188,7 +191,9 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { signalError( when (e) { is UnknownHostException -> "Unknown host \"${uri.authority}\"." - is ConnectException -> "Can't connect to this server: ${e.message}." + is ConnectException -> "Can't connect to this server: ${e.localizedMessage}." + is SocketTimeoutException -> "Connection timed out." + is CancellationException -> "Connection cancelled: ${e.message}." else -> "Oops, something failed!" } ) @@ -204,7 +209,10 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { Log.i(TAG, "sendRequest: got ${response.code} with meta \"${response.meta}\"") when (response.code.getCategory()) { Response.Code.Category.SUCCESS -> handleRequestSuccess(response, uri) - Response.Code.Category.REDIRECT -> handleRedirect(response, redirects = redirects + 1) + Response.Code.Category.REDIRECT -> handleRedirect( + response, + redirects = redirects + 1 + ) Response.Code.Category.SERVER_ERROR -> handleError(response) else -> signalError("Can't handle code ${response.code}.") } @@ -224,16 +232,34 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { val charset = Charset.defaultCharset() var mainTitle: String? = null var lastUpdate = System.currentTimeMillis() - for (line in parseData(response.data, charset, viewModelScope)) { - linesList.add(line) - if (mainTitle == null && line is TitleLine && line.level == 1) - mainTitle = line.text - val time = System.currentTimeMillis() - if (time - lastUpdate >= 100) { // Throttle to 100ms the recycler view updates… - lines.postValue(linesList) - lastUpdate = time + var lastNumLines = 0 + Log.d(TAG, "handleRequestSuccess: start parsing line data") + try { + val lineChannel = parseData(response.data, charset, viewModelScope) + while (!lineChannel.isClosedForReceive) { + val lineChannelResult = withTimeout(100) { lineChannel.tryReceive() } + lineChannelResult.onSuccess { line -> + linesList.add(line) + if (mainTitle == null && line is TitleLine && line.level == 1) + mainTitle = line.text + } + + // Throttle the recycler view updates to 100ms and new content only. + if (linesList.size > lastNumLines) { + val time = System.currentTimeMillis() + if (time - lastUpdate >= 100) { + lines.postValue(linesList) + lastUpdate = time + lastNumLines = linesList.size + } + } } + } catch (e: CancellationException) { + Log.e(TAG, "handleRequestSuccess: coroutine cancelled: ${e.message}") + state.postValue(State.IDLE) + return } + Log.d(TAG, "handleRequestSuccess: done parsing line data") lines.postValue(linesList) // We record the history entry here: it's nice because we have the main title available @@ -248,19 +274,23 @@ class MainActivity : AppCompatActivity(), ContentAdapter.ContentAdapterListen { } private fun handleError(response: Response) { - event.postValue(FailureEvent(when (response.code) { - Response.Code.TEMPORARY_FAILURE -> "40: the server encountered a temporary failure." - Response.Code.SERVER_UNAVAILABLE -> "41: the server is currently unavailable." - Response.Code.CGI_ERROR -> "42: a CGI script encountered an error." - Response.Code.PROXY_ERROR -> "43: the server failed to proxy the request." - Response.Code.SLOW_DOWN -> "44: slow down!" - Response.Code.PERMANENT_FAILURE -> "50: this request failed and similar requests will likely fail as well." - Response.Code.NOT_FOUND -> "51: this page can't be found." - Response.Code.GONE -> "52: this page is gone." - Response.Code.PROXY_REQUEST_REFUSED -> "53: the server refused to proxy the request." - Response.Code.BAD_REQUEST -> "59: bad request." - else -> "${response.code}: unknown error code." - })) + event.postValue( + FailureEvent( + when (response.code) { + Response.Code.TEMPORARY_FAILURE -> "40: the server encountered a temporary failure." + Response.Code.SERVER_UNAVAILABLE -> "41: the server is currently unavailable." + Response.Code.CGI_ERROR -> "42: a CGI script encountered an error." + Response.Code.PROXY_ERROR -> "43: the server failed to proxy the request." + Response.Code.SLOW_DOWN -> "44: slow down!" + Response.Code.PERMANENT_FAILURE -> "50: this request failed and similar requests will likely fail as well." + Response.Code.NOT_FOUND -> "51: this page can't be found." + Response.Code.GONE -> "52: this page is gone." + Response.Code.PROXY_REQUEST_REFUSED -> "53: the server refused to proxy the request." + Response.Code.BAD_REQUEST -> "59: bad request." + else -> "${response.code}: unknown error code." + } + ) + ) } } diff --git a/app/src/main/java/dev/lowrespalmtree/comet/Request.kt b/app/src/main/java/dev/lowrespalmtree/comet/Request.kt index ee7dfe8..6bac3cc 100644 --- a/app/src/main/java/dev/lowrespalmtree/comet/Request.kt +++ b/app/src/main/java/dev/lowrespalmtree/comet/Request.kt @@ -7,6 +7,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.launch import java.io.BufferedInputStream +import java.net.InetSocketAddress import java.net.SocketTimeoutException import java.security.cert.X509Certificate import javax.net.ssl.SSLContext @@ -20,8 +21,9 @@ class Request(private val uri: Uri) { Log.d(TAG, "connect") val context = SSLContext.getInstance("TLSv1.2") context.init(null, arrayOf(TrustManager()), null) - val socket = context.socketFactory.createSocket(uri.host, port) as SSLSocket + val socket = context.socketFactory.createSocket() as SSLSocket socket.soTimeout = 10000 + socket.connect(InetSocketAddress(uri.host, port), 10000) socket.startHandshake() return socket } @@ -39,13 +41,11 @@ class Request(private val uri: Uri) { try { @Suppress("BlockingMethodInNonBlockingContext") // what u gonna do while ((bis.read(buffer).also { numRead = it }) >= 0) { - Log.d(TAG, "proceed coroutine: received $numRead bytes") val received = buffer.sliceArray(0 until numRead) channel.send(received) } } catch (e: SocketTimeoutException) { - Log.i(TAG, "Socket timeout.") - channel.cancel() + Log.i(TAG, "proceed coroutine: socket timeout.") } } } @@ -56,9 +56,10 @@ class Request(private val uri: Uri) { } @SuppressLint("CustomX509TrustManager") - class TrustManager: X509TrustManager { + class TrustManager : X509TrustManager { @SuppressLint("TrustAllX509TrustManager") - override fun checkClientTrusted(chain: Array?, authType: String?) {} + override fun checkClientTrusted(chain: Array?, authType: String?) { + } override fun checkServerTrusted(chain: Array?, authType: String?) { Log.d(TAG, "cool cert, please continue") diff --git a/app/src/main/java/dev/lowrespalmtree/comet/Response.kt b/app/src/main/java/dev/lowrespalmtree/comet/Response.kt index 11bf994..47f5c24 100644 --- a/app/src/main/java/dev/lowrespalmtree/comet/Response.kt +++ b/app/src/main/java/dev/lowrespalmtree/comet/Response.kt @@ -3,7 +3,7 @@ package dev.lowrespalmtree.comet import android.util.Log import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.channels.consumeEach +import kotlinx.coroutines.channels.ClosedReceiveChannelException import kotlinx.coroutines.launch import java.nio.ByteBuffer import java.nio.charset.Charset @@ -56,39 +56,45 @@ class Response(val code: Code, val meta: String, val data: Channel) { suspend fun from(channel: Channel, scope: CoroutineScope): Response? { var received = 0 val headerBuffer = ByteBuffer.allocate(1024) - for (data in channel) { + var lfIndex: Int + // While we don't have a response object (i.e. no header parsed), keep reading. + while (true) { + val data = try { + channel.receive() + } catch (e: ClosedReceiveChannelException) { + Log.d(TAG, "companion from: channel closed during initial receive") + return null + } // Push some data into our buffer. headerBuffer.put(data) received += data.size // Check if there is enough data to parse a Gemini header from it (e.g. has \r\n). - val lfIndex = headerBuffer.array().indexOf(0x0D) // \r + lfIndex = headerBuffer.array().indexOf(0x0D) // \r if (lfIndex == -1) continue if (headerBuffer.array()[lfIndex + 1] != (0x0A.toByte())) // \n continue - // We have our header! Parse it to create our Response object. - val bytes = headerBuffer.array() - val headerData = bytes.sliceArray(0 until lfIndex) - val (code, meta) = parseHeader(headerData) - ?: return null.also { Log.e(TAG, "Failed to parse header") } - val responseChannel = Channel() - val response = Response(code, meta, responseChannel) - scope.launch { - // If we got too much data from the channel: push the trailing data first. - val trailingIndex = lfIndex + 2 - if (trailingIndex < received) { - val trailingData = bytes.sliceArray(trailingIndex until received) - responseChannel.send(trailingData) - } - // Forward all incoming data to the Response channel. - channel.consumeEach { responseChannel.send(it) } - responseChannel.close() - } - // Return the response here; this stops consuming the channel from this for-loop so - // that the coroutine above can take care of it. - return response + break } - return null + // We have our header! Parse it to create our Response object. + val bytes = headerBuffer.array() + val headerData = bytes.sliceArray(0 until lfIndex) + val (code, meta) = parseHeader(headerData) + ?: return null.also { Log.e(TAG, "companion from: can't parse header") } + val response = Response(code, meta, Channel()) + scope.launch { + // If we got too much data from the channel: push the trailing data first. + val trailingIndex = lfIndex + 2 + if (trailingIndex < received) { + val trailingData = bytes.sliceArray(trailingIndex until received) + response.data.send(trailingData) + } + // Forward all incoming data to the Response channel. + for (data in channel) + response.data.send(data) + response.data.close() + } + return response } /** Return the code and meta from this header if it could be parsed correctly. */