You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

anope2json.py 6.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #!/usr/bin/python3
  2. import binascii
  3. import json
  4. import logging
  5. import re
  6. import sys
  7. from collections import defaultdict, namedtuple
  8. AnopeObject = namedtuple('AnopeObject', ('type', 'kv'))
  9. MASK_MAGIC_REGEX = re.compile(r'[*?!@]')
  10. def access_level_to_amode(level):
  11. # https://wiki.anope.org/index.php/2.0/Modules/cs_xop
  12. if level == 'QOP':
  13. return 'q'
  14. elif level == 'SOP':
  15. return 'a'
  16. elif level == 'AOP':
  17. return 'o'
  18. elif level == 'HOP':
  19. return 'h'
  20. elif level == 'VOP':
  21. return 'v'
  22. try:
  23. level = int(level)
  24. except:
  25. return None
  26. if level >= 10000:
  27. return 'q'
  28. elif level >= 9999:
  29. return 'a'
  30. elif level >= 5:
  31. return 'o'
  32. elif level >= 4:
  33. return 'h'
  34. elif level >= 3:
  35. return 'v'
  36. else:
  37. return None
  38. def to_unixnano(timestamp):
  39. return int(timestamp) * (10**9)
  40. def file_to_objects(infile):
  41. result = []
  42. obj = None
  43. while True:
  44. line = infile.readline()
  45. if not line:
  46. break
  47. line = line.rstrip(b'\r\n')
  48. try:
  49. line = line.decode('utf-8')
  50. except UnicodeDecodeError:
  51. line = line.decode('utf-8', 'replace')
  52. logging.warning("line contained invalid utf8 data " + line)
  53. pieces = line.split(' ', maxsplit=2)
  54. if len(pieces) == 0:
  55. logging.warning("skipping blank line in db")
  56. continue
  57. if pieces[0] == 'END':
  58. result.append(obj)
  59. obj = None
  60. elif pieces[0] == 'OBJECT':
  61. obj = AnopeObject(pieces[1], {})
  62. elif pieces[0] == 'DATA':
  63. obj.kv[pieces[1]] = pieces[2]
  64. elif pieces[0] == 'ID':
  65. # not sure what these do?
  66. continue
  67. else:
  68. raise ValueError("unknown command found in anope db", pieces[0])
  69. return result
  70. ANOPE_MODENAME_TO_MODE = {
  71. 'NOEXTERNAL': 'n',
  72. 'TOPIC': 't',
  73. 'INVITE': 'i',
  74. 'NOCTCP': 'C',
  75. 'AUDITORIUM': 'u',
  76. 'SECRET': 's',
  77. }
  78. # verify that a certfp appears to be a hex-encoded SHA-256 fingerprint;
  79. # if it's anything else, silently ignore it
  80. def validate_certfps(certobj):
  81. certfps = []
  82. for fingerprint in certobj.split():
  83. try:
  84. dec = binascii.unhexlify(fingerprint)
  85. except:
  86. continue
  87. if len(dec) == 32:
  88. certfps.append(fingerprint)
  89. return certfps
  90. def convert(infile):
  91. out = {
  92. 'version': 1,
  93. 'source': 'anope',
  94. 'users': defaultdict(dict),
  95. 'channels': defaultdict(dict),
  96. }
  97. objects = file_to_objects(infile)
  98. lastmode_channels = set()
  99. for obj in objects:
  100. if obj.type == 'NickCore':
  101. username = obj.kv['display']
  102. userdata = {'name': username, 'hash': obj.kv['pass'], 'email': obj.kv['email']}
  103. certobj = obj.kv.get('cert')
  104. if certobj:
  105. userdata['certfps'] = validate_certfps(certobj)
  106. out['users'][username] = userdata
  107. elif obj.type == 'NickAlias':
  108. username = obj.kv['nc']
  109. nick = obj.kv['nick']
  110. userdata = out['users'][username]
  111. if username.lower() == nick.lower():
  112. userdata['registeredAt'] = to_unixnano(obj.kv['time_registered'])
  113. else:
  114. if 'additionalNicks' not in userdata:
  115. userdata['additionalNicks'] = []
  116. userdata['additionalNicks'].append(nick)
  117. elif obj.type == 'ChannelInfo':
  118. chname = obj.kv['name']
  119. founder = obj.kv['founder']
  120. chdata = {
  121. 'name': chname,
  122. 'founder': founder,
  123. 'registeredAt': to_unixnano(obj.kv['time_registered']),
  124. 'topic': obj.kv['last_topic'],
  125. 'topicSetBy': obj.kv['last_topic_setter'],
  126. 'topicSetAt': to_unixnano(obj.kv['last_topic_time']),
  127. 'amode': {founder: 'q',}
  128. }
  129. # DATA last_modes INVITE KEY,hunter2 NOEXTERNAL REGISTERED TOPIC
  130. last_modes = obj.kv.get('last_modes')
  131. if last_modes:
  132. modes = []
  133. for mode_desc in last_modes.split():
  134. if ',' in mode_desc:
  135. mode_name, mode_value = mode_desc.split(',', maxsplit=1)
  136. else:
  137. mode_name, mode_value = mode_desc, None
  138. if mode_name == 'KEY':
  139. chdata['key'] = mode_value
  140. else:
  141. modes.append(ANOPE_MODENAME_TO_MODE.get(mode_name, ''))
  142. chdata['modes'] = ''.join(modes)
  143. # prevent subsequent ModeLock objects from modifying the mode list further:
  144. lastmode_channels.add(chname)
  145. out['channels'][chname] = chdata
  146. elif obj.type == 'ModeLock':
  147. if obj.kv.get('set') != '1':
  148. continue
  149. chname = obj.kv['ci']
  150. if chname in lastmode_channels:
  151. continue
  152. chdata = out['channels'][chname]
  153. modename = obj.kv['name']
  154. if modename == 'KEY':
  155. chdata['key'] = obj.kv['param']
  156. else:
  157. oragono_mode = ANOPE_MODENAME_TO_MODE.get(modename)
  158. if oragono_mode is not None:
  159. stored_modes = chdata.get('modes', '')
  160. stored_modes += oragono_mode
  161. chdata['modes'] = stored_modes
  162. elif obj.type == 'ChanAccess':
  163. chname = obj.kv['ci']
  164. target = obj.kv['mask']
  165. mode = access_level_to_amode(obj.kv['data'])
  166. if mode is None:
  167. continue
  168. if MASK_MAGIC_REGEX.search(target):
  169. continue
  170. chdata = out['channels'][chname]
  171. amode = chdata.setdefault('amode', {})
  172. amode[target] = mode
  173. chdata['amode'] = amode
  174. # do some basic integrity checks
  175. for chname, chdata in out['channels'].items():
  176. founder = chdata.get('founder')
  177. if founder not in out['users']:
  178. raise ValueError("no user corresponding to channel founder", chname, chdata.get('founder'))
  179. return out
  180. def main():
  181. if len(sys.argv) != 3:
  182. raise Exception("Usage: anope2json.py anope.db output.json")
  183. with open(sys.argv[1], 'rb') as infile:
  184. output = convert(infile)
  185. with open(sys.argv[2], 'w') as outfile:
  186. json.dump(output, outfile)
  187. if __name__ == '__main__':
  188. logging.basicConfig()
  189. sys.exit(main())