You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

anope2json.py 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/python3
  2. import re
  3. import json
  4. import logging
  5. import sys
  6. from collections import defaultdict, namedtuple
  7. AnopeObject = namedtuple('AnopeObject', ('type', 'kv'))
  8. MASK_MAGIC_REGEX = re.compile(r'[*?!@]')
  9. def access_level_to_amode(level):
  10. # https://wiki.anope.org/index.php/2.0/Modules/cs_xop
  11. if level == 'QOP':
  12. return 'q'
  13. elif level == 'SOP':
  14. return 'a'
  15. elif level == 'AOP':
  16. return 'o'
  17. elif level == 'HOP':
  18. return 'h'
  19. elif level == 'VOP':
  20. return 'v'
  21. try:
  22. level = int(level)
  23. except:
  24. return None
  25. if level >= 10000:
  26. return 'q'
  27. elif level >= 9999:
  28. return 'a'
  29. elif level >= 5:
  30. return 'o'
  31. elif level >= 4:
  32. return 'h'
  33. elif level >= 3:
  34. return 'v'
  35. else:
  36. return None
  37. def to_unixnano(timestamp):
  38. return int(timestamp) * (10**9)
  39. def file_to_objects(infile):
  40. result = []
  41. obj = None
  42. for line in infile:
  43. pieces = line.rstrip('\r\n').split(' ', maxsplit=2)
  44. if len(pieces) == 0:
  45. logging.warning("skipping blank line in db")
  46. continue
  47. if pieces[0] == 'END':
  48. result.append(obj)
  49. obj = None
  50. elif pieces[0] == 'OBJECT':
  51. obj = AnopeObject(pieces[1], {})
  52. elif pieces[0] == 'DATA':
  53. obj.kv[pieces[1]] = pieces[2]
  54. else:
  55. raise ValueError("unknown command found in anope db", pieces[0])
  56. return result
  57. ANOPE_MODENAME_TO_MODE = {
  58. 'NOEXTERNAL': 'n',
  59. 'TOPIC': 't',
  60. 'INVITE': 'i',
  61. 'NOCTCP': 'C',
  62. 'AUDITORIUM': 'u',
  63. 'SECRET': 's',
  64. }
  65. def convert(infile):
  66. out = {
  67. 'version': 1,
  68. 'source': 'anope',
  69. 'users': defaultdict(dict),
  70. 'channels': defaultdict(dict),
  71. }
  72. objects = file_to_objects(infile)
  73. lastmode_channels = set()
  74. for obj in objects:
  75. if obj.type == 'NickCore':
  76. username = obj.kv['display']
  77. userdata = {'name': username, 'hash': obj.kv['pass'], 'email': obj.kv['email']}
  78. out['users'][username] = userdata
  79. elif obj.type == 'NickAlias':
  80. username = obj.kv['nc']
  81. nick = obj.kv['nick']
  82. userdata = out['users'][username]
  83. if username.lower() == nick.lower():
  84. userdata['registeredAt'] = to_unixnano(obj.kv['time_registered'])
  85. else:
  86. if 'additionalNicks' not in userdata:
  87. userdata['additionalNicks'] = []
  88. userdata['additionalNicks'].append(nick)
  89. elif obj.type == 'ChannelInfo':
  90. chname = obj.kv['name']
  91. founder = obj.kv['founder']
  92. chdata = {
  93. 'name': chname,
  94. 'founder': founder,
  95. 'registeredAt': to_unixnano(obj.kv['time_registered']),
  96. 'topic': obj.kv['last_topic'],
  97. 'topicSetBy': obj.kv['last_topic_setter'],
  98. 'topicSetAt': to_unixnano(obj.kv['last_topic_time']),
  99. 'amode': {founder: 'q',}
  100. }
  101. # DATA last_modes INVITE KEY,hunter2 NOEXTERNAL REGISTERED TOPIC
  102. last_modes = obj.kv.get('last_modes')
  103. if last_modes:
  104. modes = []
  105. for mode_desc in last_modes.split():
  106. if ',' in mode_desc:
  107. mode_name, mode_value = mode_desc.split(',', maxsplit=1)
  108. else:
  109. mode_name, mode_value = mode_desc, None
  110. if mode_name == 'KEY':
  111. chdata['key'] = mode_value
  112. else:
  113. modes.append(ANOPE_MODENAME_TO_MODE.get(mode_name, ''))
  114. chdata['modes'] = ''.join(modes)
  115. # prevent subsequent ModeLock objects from modifying the mode list further:
  116. lastmode_channels.add(chname)
  117. out['channels'][chname] = chdata
  118. elif obj.type == 'ModeLock':
  119. if obj.kv.get('set') != '1':
  120. continue
  121. chname = obj.kv['ci']
  122. if chname in lastmode_channels:
  123. continue
  124. chdata = out['channels'][chname]
  125. modename = obj.kv['name']
  126. if modename == 'KEY':
  127. chdata['key'] = obj.kv['param']
  128. else:
  129. oragono_mode = ANOPE_MODENAME_TO_MODE.get(modename)
  130. if oragono_mode is not None:
  131. stored_modes = chdata.get('modes', '')
  132. stored_modes += oragono_mode
  133. chdata['modes'] = stored_modes
  134. elif obj.type == 'ChanAccess':
  135. chname = obj.kv['ci']
  136. target = obj.kv['mask']
  137. mode = access_level_to_amode(obj.kv['data'])
  138. if mode is None:
  139. continue
  140. if MASK_MAGIC_REGEX.search(target):
  141. continue
  142. chdata = out['channels'][chname]
  143. amode = chdata.setdefault('amode', {})
  144. amode[target] = mode
  145. chdata['amode'] = amode
  146. # do some basic integrity checks
  147. for chname, chdata in out['channels'].items():
  148. founder = chdata.get('founder')
  149. if founder not in out['users']:
  150. raise ValueError("no user corresponding to channel founder", chname, chdata.get('founder'))
  151. return out
  152. def main():
  153. if len(sys.argv) != 3:
  154. raise Exception("Usage: anope2json.py anope.db output.json")
  155. with open(sys.argv[1]) as infile:
  156. output = convert(infile)
  157. with open(sys.argv[2], 'w') as outfile:
  158. json.dump(output, outfile)
  159. if __name__ == '__main__':
  160. logging.basicConfig()
  161. sys.exit(main())