You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

anope2json.py 5.3KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #!/usr/bin/python3
  2. import re
  3. import json
  4. import logging
  5. import sys
  6. from collections import defaultdict, namedtuple
  7. AnopeObject = namedtuple('AnopeObject', ('type', 'kv'))
  8. MASK_MAGIC_REGEX = re.compile(r'[*?!@]')
  9. def access_level_to_amode(level):
  10. try:
  11. level = int(level)
  12. except:
  13. return None
  14. if level >= 10000:
  15. return 'q'
  16. elif level >= 9999:
  17. return 'a'
  18. elif level >= 5:
  19. return 'o'
  20. elif level >= 4:
  21. return 'h'
  22. elif level >= 3:
  23. return 'v'
  24. else:
  25. return None
  26. def to_unixnano(timestamp):
  27. return int(timestamp) * (10**9)
  28. def file_to_objects(infile):
  29. result = []
  30. obj = None
  31. for line in infile:
  32. pieces = line.rstrip('\r\n').split(' ', maxsplit=2)
  33. if len(pieces) == 0:
  34. logging.warning("skipping blank line in db")
  35. continue
  36. if pieces[0] == 'END':
  37. result.append(obj)
  38. obj = None
  39. elif pieces[0] == 'OBJECT':
  40. obj = AnopeObject(pieces[1], {})
  41. elif pieces[0] == 'DATA':
  42. obj.kv[pieces[1]] = pieces[2]
  43. else:
  44. raise ValueError("unknown command found in anope db", pieces[0])
  45. return result
  46. ANOPE_MODENAME_TO_MODE = {
  47. 'NOEXTERNAL': 'n',
  48. 'TOPIC': 't',
  49. 'INVITE': 'i',
  50. 'NOCTCP': 'C',
  51. 'AUDITORIUM': 'u',
  52. 'SECRET': 's',
  53. }
  54. def convert(infile):
  55. out = {
  56. 'version': 1,
  57. 'source': 'anope',
  58. 'users': defaultdict(dict),
  59. 'channels': defaultdict(dict),
  60. }
  61. objects = file_to_objects(infile)
  62. lastmode_channels = set()
  63. for obj in objects:
  64. if obj.type == 'NickCore':
  65. username = obj.kv['display']
  66. userdata = {'name': username, 'hash': obj.kv['pass'], 'email': obj.kv['email']}
  67. out['users'][username] = userdata
  68. elif obj.type == 'NickAlias':
  69. username = obj.kv['nc']
  70. nick = obj.kv['nick']
  71. userdata = out['users'][username]
  72. if username.lower() == nick.lower():
  73. userdata['registeredAt'] = to_unixnano(obj.kv['time_registered'])
  74. else:
  75. if 'additionalNicks' not in userdata:
  76. userdata['additionalNicks'] = []
  77. userdata['additionalNicks'].append(nick)
  78. elif obj.type == 'ChannelInfo':
  79. chname = obj.kv['name']
  80. founder = obj.kv['founder']
  81. chdata = {
  82. 'name': chname,
  83. 'founder': founder,
  84. 'registeredAt': to_unixnano(obj.kv['time_registered']),
  85. 'topic': obj.kv['last_topic'],
  86. 'topicSetBy': obj.kv['last_topic_setter'],
  87. 'topicSetAt': to_unixnano(obj.kv['last_topic_time']),
  88. 'amode': {founder: 'q',}
  89. }
  90. # DATA last_modes INVITE KEY,hunter2 NOEXTERNAL REGISTERED TOPIC
  91. last_modes = obj.kv.get('last_modes')
  92. if last_modes:
  93. modes = []
  94. for mode_desc in last_modes.split():
  95. if ',' in mode_desc:
  96. mode_name, mode_value = mode_desc.split(',', maxsplit=1)
  97. else:
  98. mode_name, mode_value = mode_desc, None
  99. if mode_name == 'KEY':
  100. chdata['key'] = mode_value
  101. else:
  102. modes.append(ANOPE_MODENAME_TO_MODE.get(mode_name, ''))
  103. chdata['modes'] = ''.join(modes)
  104. # prevent subsequent ModeLock objects from modifying the mode list further:
  105. lastmode_channels.add(chname)
  106. out['channels'][chname] = chdata
  107. elif obj.type == 'ModeLock':
  108. if obj.kv.get('set') != '1':
  109. continue
  110. chname = obj.kv['ci']
  111. if chname in lastmode_channels:
  112. continue
  113. chdata = out['channels'][chname]
  114. modename = obj.kv['name']
  115. if modename == 'KEY':
  116. chdata['key'] = obj.kv['param']
  117. else:
  118. oragono_mode = ANOPE_MODENAME_TO_MODE.get(modename)
  119. if oragono_mode is not None:
  120. stored_modes = chdata.get('modes', '')
  121. stored_modes += oragono_mode
  122. chdata['modes'] = stored_modes
  123. elif obj.type == 'ChanAccess':
  124. chname = obj.kv['ci']
  125. target = obj.kv['mask']
  126. mode = access_level_to_amode(obj.kv['data'])
  127. if mode is None:
  128. continue
  129. if MASK_MAGIC_REGEX.search(target):
  130. continue
  131. chdata = out['channels'][chname]
  132. amode = chdata.setdefault('amode', {})
  133. amode[target] = mode
  134. chdata['amode'] = amode
  135. # do some basic integrity checks
  136. for chname, chdata in out['channels'].items():
  137. founder = chdata.get('founder')
  138. if founder not in out['users']:
  139. raise ValueError("no user corresponding to channel founder", chname, chdata.get('founder'))
  140. return out
  141. def main():
  142. if len(sys.argv) != 3:
  143. raise Exception("Usage: anope2json.py anope.db output.json")
  144. with open(sys.argv[1]) as infile:
  145. output = convert(infile)
  146. with open(sys.argv[2], 'w') as outfile:
  147. json.dump(output, outfile)
  148. if __name__ == '__main__':
  149. logging.basicConfig()
  150. sys.exit(main())