You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

IrcClientImpl.kt 5.7KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package com.dmdirc.ktirc
  2. import com.dmdirc.ktirc.events.*
  3. import com.dmdirc.ktirc.events.handlers.eventHandlers
  4. import com.dmdirc.ktirc.events.mutators.eventMutators
  5. import com.dmdirc.ktirc.io.KtorLineBufferedSocket
  6. import com.dmdirc.ktirc.io.LineBufferedSocket
  7. import com.dmdirc.ktirc.io.MessageHandler
  8. import com.dmdirc.ktirc.io.MessageParser
  9. import com.dmdirc.ktirc.messages.*
  10. import com.dmdirc.ktirc.messages.processors.messageProcessors
  11. import com.dmdirc.ktirc.model.*
  12. import com.dmdirc.ktirc.util.currentTimeProvider
  13. import com.dmdirc.ktirc.util.generateLabel
  14. import com.dmdirc.ktirc.util.logger
  15. import io.ktor.util.KtorExperimentalAPI
  16. import kotlinx.coroutines.*
  17. import kotlinx.coroutines.channels.Channel
  18. import kotlinx.coroutines.channels.map
  19. import kotlinx.coroutines.time.withTimeoutOrNull
  20. import java.time.Duration
  21. import java.util.concurrent.atomic.AtomicBoolean
  22. import java.util.logging.Level
  23. /**
  24. * Concrete implementation of an [IrcClient].
  25. */
  26. // TODO: How should alternative nicknames work?
  27. // TODO: Should IRC Client take a pool of servers and rotate through, or make the caller do that?
  28. internal class IrcClientImpl(private val config: IrcClientConfig) : IrcClient, CoroutineScope {
  29. private val log by logger()
  30. @ExperimentalCoroutinesApi
  31. override val coroutineContext = GlobalScope.newCoroutineContext(Dispatchers.IO)
  32. @ExperimentalCoroutinesApi
  33. @KtorExperimentalAPI
  34. internal var socketFactory: (CoroutineScope, String, Int, Boolean) -> LineBufferedSocket = ::KtorLineBufferedSocket
  35. internal var asyncTimeout = Duration.ofSeconds(20)
  36. override var behaviour = config.behaviour
  37. override val serverState = ServerState(config.profile.nickname, config.server.host, config.sasl)
  38. override val channelState = ChannelStateMap { caseMapping }
  39. override val userState = UserState { caseMapping }
  40. private val messageHandler = MessageHandler(messageProcessors, eventMutators, eventHandlers)
  41. private val messageBuilder = MessageBuilder()
  42. private val parser = MessageParser()
  43. private var socket: LineBufferedSocket? = null
  44. private val connecting = AtomicBoolean(false)
  45. override fun send(message: String) {
  46. socket?.sendChannel?.offer(message.toByteArray()) ?: log.warning { "No send channel for message: $message" }
  47. }
  48. override fun send(tags: Map<MessageTag, String>, command: String, vararg arguments: String) {
  49. maybeEchoMessage(command, arguments)
  50. socket?.sendChannel?.offer(messageBuilder.build(tags, command, arguments))
  51. ?: log.warning { "No send channel for command: $command" }
  52. }
  53. override fun sendAsync(tags: Map<MessageTag, String>, command: String, vararg arguments: String) = async {
  54. if (serverState.supportsLabeledResponses) {
  55. val label = generateLabel(this@IrcClientImpl)
  56. val channel = Channel<IrcEvent>(1)
  57. serverState.labelChannels[label] = channel
  58. send(tags + (MessageTag.Label to label), command, *arguments)
  59. withTimeoutOrNull(asyncTimeout) { channel.receive() }.also { serverState.labelChannels.remove(label) }
  60. } else {
  61. send(tags, command, *arguments)
  62. null
  63. }
  64. }
  65. override fun connect() {
  66. check(!connecting.getAndSet(true))
  67. @Suppress("EXPERIMENTAL_API_USAGE")
  68. with(socketFactory(this, config.server.host, config.server.port, config.server.useTls)) {
  69. socket = this
  70. emitEvent(ServerConnecting(EventMetadata(currentTimeProvider())))
  71. launch {
  72. try {
  73. connect()
  74. emitEvent(ServerConnected(EventMetadata(currentTimeProvider())))
  75. sendCapabilityList()
  76. sendPasswordIfPresent()
  77. sendNickChange(config.profile.nickname)
  78. sendUser(config.profile.username, config.profile.realName)
  79. messageHandler.processMessages(this@IrcClientImpl, receiveChannel.map { parser.parse(it) })
  80. } catch (ex: Exception) {
  81. log.log(Level.SEVERE, ex) { "Error connecting to ${config.server.host}:${config.server.port}" }
  82. emitEvent(ServerConnectionError(EventMetadata(currentTimeProvider()), ex.toConnectionError(), ex.localizedMessage))
  83. }
  84. reset()
  85. emitEvent(ServerDisconnected(EventMetadata(currentTimeProvider())))
  86. }
  87. }
  88. }
  89. override fun disconnect() {
  90. socket?.disconnect()
  91. }
  92. override fun onEvent(handler: (IrcEvent) -> Unit) = messageHandler.addEmitter(handler)
  93. private fun emitEvent(event: IrcEvent) = messageHandler.handleEvent(this, event)
  94. private fun sendPasswordIfPresent() = config.server.password?.let(this::sendPassword)
  95. private fun maybeEchoMessage(command: String, arguments: Array<out String>) {
  96. // TODO: Is this the best place to do it? It'd be nicer to actually build the message and
  97. // reflect the raw line back through all the processors etc.
  98. if (command == "PRIVMSG" && behaviour.alwaysEchoMessages && !serverState.capabilities.enabledCapabilities.contains(Capability.EchoMessages)) {
  99. emitEvent(MessageReceived(
  100. EventMetadata(currentTimeProvider()),
  101. userState[serverState.localNickname]?.details ?: User(serverState.localNickname),
  102. arguments[0],
  103. arguments[1]
  104. ))
  105. }
  106. }
  107. internal fun reset() {
  108. serverState.reset()
  109. channelState.clear()
  110. userState.reset()
  111. socket = null
  112. connecting.set(false)
  113. }
  114. }