Du kan inte välja fler än 25 ämnen Ämnen måste starta med en bokstav eller siffra, kan innehålla bindestreck ('-') och vara max 35 tecken långa.

LineBufferedSocket.kt 3.1KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package com.dmdirc.ktirc.io
  2. import com.dmdirc.ktirc.util.logger
  3. import io.ktor.network.selector.ActorSelectorManager
  4. import io.ktor.network.sockets.Socket
  5. import io.ktor.network.sockets.aSocket
  6. import io.ktor.network.sockets.openReadChannel
  7. import io.ktor.network.sockets.openWriteChannel
  8. import io.ktor.network.util.ioCoroutineDispatcher
  9. import kotlinx.coroutines.CoroutineScope
  10. import kotlinx.coroutines.channels.ReceiveChannel
  11. import kotlinx.coroutines.channels.produce
  12. import kotlinx.coroutines.io.ByteReadChannel
  13. import kotlinx.coroutines.io.ByteWriteChannel
  14. import java.net.InetSocketAddress
  15. interface LineBufferedSocket {
  16. suspend fun connect()
  17. fun disconnect()
  18. suspend fun sendLine(line: ByteArray, offset: Int = 0, length: Int = line.size)
  19. suspend fun sendLine(line: String)
  20. fun readLines(coroutineScope: CoroutineScope): ReceiveChannel<ByteArray>
  21. }
  22. /**
  23. * Asynchronous socket that buffers incoming data and emits individual lines.
  24. */
  25. // TODO: TLS options
  26. class KtorLineBufferedSocket(private val host: String, private val port: Int): LineBufferedSocket {
  27. companion object {
  28. const val CARRIAGE_RETURN = '\r'.toByte()
  29. const val LINE_FEED = '\n'.toByte()
  30. }
  31. private val log by logger()
  32. private lateinit var socket: Socket
  33. private lateinit var readChannel: ByteReadChannel
  34. private lateinit var writeChannel: ByteWriteChannel
  35. override suspend fun connect() {
  36. log.info { "Connecting..." }
  37. socket = aSocket(ActorSelectorManager(ioCoroutineDispatcher)).tcp().connect(InetSocketAddress(host, port))
  38. readChannel = socket.openReadChannel()
  39. writeChannel = socket.openWriteChannel()
  40. }
  41. override fun disconnect() {
  42. log.info { "Disconnecting..." }
  43. socket.close()
  44. }
  45. override suspend fun sendLine(line: ByteArray, offset: Int, length: Int) {
  46. with (writeChannel) {
  47. log.fine { ">>> ${String(line, offset, length)}" }
  48. writeAvailable(line, offset, length)
  49. writeByte(CARRIAGE_RETURN)
  50. writeByte(LINE_FEED)
  51. flush()
  52. }
  53. }
  54. override suspend fun sendLine(line: String) = sendLine(line.toByteArray())
  55. override fun readLines(coroutineScope: CoroutineScope) = coroutineScope.produce {
  56. val lineBuffer = ByteArray(4096)
  57. var index = 0
  58. while (!readChannel.isClosedForRead) {
  59. var start = index
  60. val count = readChannel.readAvailable(lineBuffer, index, lineBuffer.size - index)
  61. for (i in index until index + count) {
  62. if (lineBuffer[i] == CARRIAGE_RETURN || lineBuffer[i] == LINE_FEED) {
  63. if (start < i) {
  64. val line = lineBuffer.sliceArray(start until i)
  65. log.fine { "<<< ${String(line)}" }
  66. send(line)
  67. }
  68. start = i + 1
  69. }
  70. }
  71. lineBuffer.copyInto(lineBuffer, 0, start)
  72. index = count + index - start
  73. }
  74. }
  75. }