You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

Sockets.kt 4.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package com.dmdirc.ktirc.io
  2. import kotlinx.coroutines.CancellableContinuation
  3. import kotlinx.coroutines.CoroutineScope
  4. import kotlinx.coroutines.io.ByteChannel
  5. import kotlinx.coroutines.io.ByteWriteChannel
  6. import kotlinx.coroutines.io.close
  7. import kotlinx.coroutines.launch
  8. import kotlinx.coroutines.suspendCancellableCoroutine
  9. import kotlinx.io.pool.useInstance
  10. import java.net.SocketAddress
  11. import java.nio.ByteBuffer
  12. import java.nio.channels.*
  13. import kotlin.coroutines.resume
  14. import kotlin.coroutines.resumeWithException
  15. internal interface Socket {
  16. fun bind(socketAddress: SocketAddress)
  17. suspend fun connect(socketAddress: SocketAddress)
  18. suspend fun read(): ByteBuffer?
  19. fun close()
  20. val write: ByteWriteChannel
  21. val isOpen: Boolean
  22. }
  23. internal class PlainTextSocket(private val scope: CoroutineScope) : Socket {
  24. private val client = AsynchronousSocketChannel.open()
  25. private var writeChannel = ByteChannel(autoFlush = true)
  26. override val write: ByteWriteChannel
  27. get() = writeChannel
  28. override val isOpen: Boolean
  29. get() = client.isOpen
  30. override fun bind(socketAddress: SocketAddress) {
  31. client.bind(socketAddress)
  32. }
  33. override suspend fun connect(socketAddress: SocketAddress) {
  34. writeChannel = ByteChannel(autoFlush = true)
  35. suspendCancellableCoroutine<Unit> { continuation ->
  36. client.closeOnCancel(continuation)
  37. client.connect(socketAddress, continuation, AsyncVoidIOHandler)
  38. }
  39. scope.launch { writeLoop() }
  40. }
  41. override fun close() {
  42. writeChannel.close()
  43. client.close()
  44. }
  45. override suspend fun read(): ByteBuffer? {
  46. val buffer = byteBufferPool.borrow()
  47. try {
  48. val bytes = suspendCancellableCoroutine<Int> { continuation ->
  49. client.closeOnCancel(continuation)
  50. client.read(buffer, continuation, asyncIOHandler())
  51. }
  52. if (bytes == -1) {
  53. close()
  54. }
  55. buffer.flip()
  56. return buffer
  57. } catch (_: ClosedChannelException) {
  58. // Ignore
  59. byteBufferPool.recycle(buffer)
  60. return null
  61. }
  62. }
  63. private suspend fun writeLoop() {
  64. while (client.isOpen) {
  65. byteBufferPool.useInstance { buffer ->
  66. writeChannel.readAvailable(buffer)
  67. buffer.flip()
  68. try {
  69. suspendCancellableCoroutine<Int> { continuation ->
  70. client.closeOnCancel(continuation)
  71. client.write(buffer, continuation, asyncIOHandler())
  72. }
  73. } catch (_: ClosedChannelException) {
  74. // Ignore
  75. }
  76. }
  77. }
  78. }
  79. }
  80. private fun Channel.closeOnCancel(cont: CancellableContinuation<*>) {
  81. cont.invokeOnCancellation {
  82. try {
  83. close()
  84. } catch (ex: Throwable) {
  85. // Specification says that it is Ok to call it any time, but reality is different,
  86. // so we have just to ignore exception
  87. }
  88. }
  89. }
  90. @Suppress("UNCHECKED_CAST")
  91. private fun <T> asyncIOHandler(): CompletionHandler<T, CancellableContinuation<T>> =
  92. AsyncIOHandlerAny as CompletionHandler<T, CancellableContinuation<T>>
  93. private object AsyncIOHandlerAny : CompletionHandler<Any, CancellableContinuation<Any>> {
  94. override fun completed(result: Any, cont: CancellableContinuation<Any>) {
  95. cont.resume(result)
  96. }
  97. override fun failed(ex: Throwable, cont: CancellableContinuation<Any>) {
  98. // just return if already cancelled and got an expected exception for that case
  99. if (ex is AsynchronousCloseException && cont.isCancelled) return
  100. cont.resumeWithException(ex)
  101. }
  102. }
  103. private object AsyncVoidIOHandler : CompletionHandler<Void?, CancellableContinuation<Unit>> {
  104. override fun completed(result: Void?, cont: CancellableContinuation<Unit>) {
  105. cont.resume(Unit)
  106. }
  107. override fun failed(ex: Throwable, cont: CancellableContinuation<Unit>) {
  108. // just return if already cancelled and got an expected exception for that case
  109. if (ex is AsynchronousCloseException && cont.isCancelled) return
  110. cont.resumeWithException(ex)
  111. }
  112. }