You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

IrcClientImpl.kt 7.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. package com.dmdirc.ktirc
  2. import com.dmdirc.ktirc.events.*
  3. import com.dmdirc.ktirc.events.handlers.eventHandlers
  4. import com.dmdirc.ktirc.events.mutators.eventMutators
  5. import com.dmdirc.ktirc.io.LineBufferedSocket
  6. import com.dmdirc.ktirc.io.LineBufferedSocketImpl
  7. import com.dmdirc.ktirc.io.MessageHandler
  8. import com.dmdirc.ktirc.io.MessageParser
  9. import com.dmdirc.ktirc.messages.*
  10. import com.dmdirc.ktirc.messages.processors.messageProcessors
  11. import com.dmdirc.ktirc.model.*
  12. import com.dmdirc.ktirc.util.RemoveIn
  13. import com.dmdirc.ktirc.util.currentTimeProvider
  14. import com.dmdirc.ktirc.util.generateLabel
  15. import com.dmdirc.ktirc.util.logger
  16. import kotlinx.coroutines.*
  17. import kotlinx.coroutines.channels.Channel
  18. import kotlinx.coroutines.channels.map
  19. import kotlinx.coroutines.sync.Mutex
  20. import java.net.Inet6Address
  21. import java.net.InetAddress
  22. import java.time.Duration
  23. import java.util.logging.Level
  24. internal data class IrcClientConfig(
  25. val server: ServerConfig,
  26. val profile: ProfileConfig,
  27. val behaviour: ClientBehaviour,
  28. val sasl: SaslConfig?)
  29. internal data class IrcPingTimeouts(
  30. override val sendPeriod: Duration,
  31. override val responseGracePeriod: Duration,
  32. override val incomingLinesResetTimer: Boolean) : PingTimeouts
  33. /**
  34. * Concrete implementation of an [IrcClient].
  35. */
  36. internal class IrcClientImpl(private val config: IrcClientConfig) : ExperimentalIrcClient, CoroutineScope {
  37. private val log by logger()
  38. @ExperimentalCoroutinesApi
  39. override val coroutineContext = GlobalScope.newCoroutineContext(Dispatchers.IO)
  40. @ExperimentalCoroutinesApi
  41. internal var socketFactory: (CoroutineScope, String, String, Int, Boolean) -> LineBufferedSocket = ::LineBufferedSocketImpl
  42. internal var resolver: (String) -> Collection<ResolveResult> = { host ->
  43. InetAddress.getAllByName(host).map { ResolveResult(it.hostAddress, it is Inet6Address) }
  44. }
  45. internal var asyncTimeout = Duration.ofSeconds(20)
  46. override var behaviour = config.behaviour
  47. override val serverState = ServerState(config.profile.nickname, config.server.host, config.sasl)
  48. override val channelState = ChannelStateMap { caseMapping }
  49. override val localUser = User(config.profile.nickname)
  50. override val userState = UserState { caseMapping }.apply { this += localUser }
  51. private val messageHandler = MessageHandler(messageProcessors, eventMutators, eventHandlers)
  52. private val messageBuilder = MessageBuilder()
  53. private val parser = MessageParser()
  54. private var socket: LineBufferedSocket? = null
  55. private val connecting = Mutex(false)
  56. @Deprecated("Use structured send instead", ReplaceWith("send(command, arguments)"))
  57. @RemoveIn("2.0.0")
  58. override fun send(message: String) {
  59. socket?.sendChannel?.offer(message.toByteArray()) ?: log.warning { "No send channel for message: $message" }
  60. }
  61. override fun send(tags: Map<MessageTag, String>, command: String, vararg arguments: String) {
  62. maybeEchoMessage(command, arguments)
  63. socket?.sendChannel?.offer(messageBuilder.build(tags, command, arguments))
  64. ?: log.warning { "No send channel for command: $command" }
  65. }
  66. override fun sendAsync(tags: Map<MessageTag, String>, command: String, arguments: Array<String>, matcher: (IrcEvent) -> Boolean) = async {
  67. val label = generateLabel(this@IrcClientImpl)
  68. val channel = Channel<IrcEvent>(1)
  69. if (serverState.asyncResponseState.supportsLabeledResponses) {
  70. serverState.asyncResponseState.pendingResponses[label] = channel to { event -> event.metadata.label == label }
  71. send(tags + (MessageTag.Label to label), command, *arguments)
  72. } else {
  73. serverState.asyncResponseState.pendingResponses[label] = channel to matcher
  74. send(tags, command, *arguments)
  75. }
  76. withTimeoutOrNull(asyncTimeout.toMillis()) {
  77. channel.receive()
  78. }.also { serverState.asyncResponseState.pendingResponses.remove(label) }
  79. }
  80. override fun connect() {
  81. check(connecting.tryLock()) { "IrcClient is already connected to a server" }
  82. val ip: String
  83. try {
  84. ip = resolve(config.server.host)
  85. } catch (ex: Exception) {
  86. log.log(Level.SEVERE, ex) { "Error resolving ${config.server.host}" }
  87. emitEvent(ServerConnectionError(EventMetadata(currentTimeProvider()), ConnectionError.UnresolvableAddress, ex.localizedMessage))
  88. reset()
  89. return
  90. }
  91. @Suppress("EXPERIMENTAL_API_USAGE")
  92. with(socketFactory(this, config.server.host, ip, config.server.port, config.server.useTls)) {
  93. socket = this
  94. emitEvent(ServerConnecting(EventMetadata(currentTimeProvider())))
  95. launch {
  96. try {
  97. connect()
  98. emitEvent(ServerConnected(EventMetadata(currentTimeProvider())))
  99. sendCapabilityList()
  100. sendPasswordIfPresent()
  101. sendNickChange(config.profile.nickname)
  102. sendUser(config.profile.username, config.profile.realName)
  103. messageHandler.processMessages(this@IrcClientImpl, receiveChannel.map { parser.parse(it) })
  104. } catch (ex: Exception) {
  105. log.log(Level.SEVERE, ex) { "Error connecting to ${config.server.host}:${config.server.port}" }
  106. emitEvent(ServerConnectionError(EventMetadata(currentTimeProvider()), ex.toConnectionError(), ex.localizedMessage))
  107. }
  108. reset()
  109. emitEvent(ServerDisconnected(EventMetadata(currentTimeProvider())))
  110. }
  111. }
  112. }
  113. override fun disconnect() {
  114. runBlocking {
  115. socket?.disconnect()
  116. connecting.lock()
  117. connecting.unlock()
  118. }
  119. }
  120. override fun onEvent(handler: (IrcEvent) -> Unit) = messageHandler.addEmitter(handler)
  121. private fun emitEvent(event: IrcEvent) = messageHandler.handleEvent(this, event)
  122. private fun sendPasswordIfPresent() = config.server.password?.let(this::sendPassword)
  123. private fun maybeEchoMessage(command: String, arguments: Array<out String>) {
  124. // TODO: Is this the best place to do it? It'd be nicer to actually build the message and
  125. // reflect the raw line back through all the processors etc.
  126. if (command == "PRIVMSG" && behaviour.alwaysEchoMessages && !serverState.capabilities.enabledCapabilities.contains(Capability.EchoMessages)) {
  127. emitEvent(MessageReceived(
  128. EventMetadata(currentTimeProvider()),
  129. localUser,
  130. arguments[0],
  131. arguments[1]
  132. ))
  133. }
  134. }
  135. private fun resolve(host: String): String {
  136. val hosts = resolver(host)
  137. val preferredHosts = hosts.filter { it.isV6 == behaviour.preferIPv6 }
  138. return if (preferredHosts.isNotEmpty()) {
  139. preferredHosts.random().ip
  140. } else {
  141. hosts.random().ip
  142. }
  143. }
  144. internal fun reset() {
  145. serverState.reset()
  146. channelState.clear()
  147. userState.reset()
  148. localUser.reset(config.profile.nickname)
  149. userState += localUser
  150. socket = null
  151. connecting.tryLock()
  152. connecting.unlock()
  153. }
  154. }
  155. internal data class ResolveResult(val ip: String, val isV6: Boolean)