# -*- coding: utf-8 -*- """ Pokemon Crystal data de/compression. """ """ A rundown of Pokemon Crystal's compression scheme: Control commands occupy bits 5-7. Bits 0-4 serve as the first parameter for each command. """ lz_commands = { 'literal': 0, # n values for n bytes 'iterate': 1, # one value for n bytes 'alternate': 2, # alternate two values for n bytes 'blank': 3, # zero for n bytes } """ Repeater commands repeat any data that was just decompressed. They take an additional signed parameter to mark a relative starting point. These wrap around (positive from the start, negative from the current position). """ lz_commands.update({ 'repeat': 4, # n bytes starting from s 'flip': 5, # n bytes in reverse bit order starting from s 'reverse': 6, # n bytes backwards starting from s }) """ The long command is used when 5 bits aren't enough. Bits 2-4 contain a new control code. Bits 0-1 are appended to a new byte as 8-9, allowing a 10-bit parameter. """ lz_commands.update({ 'long': 7, # n is now 10 bits for a new control code }) max_length = 1 << 10 # can't go higher than 10 bits lowmax = 1 << 5 # standard 5-bit param """ If 0xff is encountered instead of a command, decompression ends. """ lz_end = 0xff bit_flipped = [ sum(((byte >> i) & 1) << (7 - i) for i in xrange(8)) for byte in xrange(0x100) ] class Compressed: """ Usage: lz = Compressed(data).output or lz = Compressed().compress(data) or c = Compressed() c.data = data lz = c.compress() There are some issues with reproducing the target compressor. Some notes are listed here: - the criteria for detecting a lookback is inconsistent - sometimes lookbacks that are mostly 0s are pruned, sometimes not - target appears to skip ahead if it can use a lookback soon, stopping the current command short or in some cases truncating it with literals. - this has been implemented, but the specifics are unknown - self.min_scores: It's unknown if blank's minimum score should be 1 or 2. Most likely it's 1, with some other hack to account for edge cases. - may be related to the above - target does not appear to compress backwards """ def __init__(self, *args, **kwargs): self.min_scores = { 'blank': 1, 'iterate': 2, 'alternate': 3, 'repeat': 3, 'reverse': 3, 'flip': 3, } self.preference = [ 'repeat', 'blank', 'flip', 'reverse', 'iterate', 'alternate', #'literal', ] self.lookback_methods = 'repeat', 'reverse', 'flip' self.__dict__.update({ 'data': None, 'commands': lz_commands, 'debug': False, 'literal_only': False, }) self.arg_names = 'data', 'commands', 'debug', 'literal_only' self.__dict__.update(kwargs) self.__dict__.update(dict(zip(self.arg_names, args))) if self.data is not None: self.compress() def compress(self, data=None): if data is not None: self.data = data self.data = list(bytearray(self.data)) self.indexes = {} self.lookbacks = {} for method in self.lookback_methods: self.lookbacks[method] = {} self.address = 0 self.end = len(self.data) self.output = [] self.literal = None while self.address < self.end: if self.score(): self.do_literal() self.do_winner() else: if self.literal == None: self.literal = self.address self.address += 1 self.do_literal() self.output += [lz_end] return self.output def reset_scores(self): self.scores = {} self.offsets = {} self.helpers = {} for method in self.min_scores.iterkeys(): self.scores[method] = 0 def bit_flip(self, byte): return bit_flipped[byte] def do_literal(self): if self.literal != None: length = abs(self.address - self.literal) start = min(self.literal, self.address + 1) self.helpers['literal'] = self.data[start:start+length] self.do_cmd('literal', length) self.literal = None def score(self): self.reset_scores() map(self.score_literal, ['iterate', 'alternate', 'blank']) for method in self.lookback_methods: self.scores[method], self.offsets[method] = self.find_lookback(method, self.address) self.stop_short() return any( score > self.min_scores[method] + int(score > lowmax) for method, score in self.scores.iteritems() ) def stop_short(self): """ If a lookback is close, reduce the scores of other commands. """ best_method, best_score = max( self.scores.items(), key = lambda x: ( x[1], -self.preference.index(x[0]) ) ) for method in self.lookback_methods: min_score = self.min_scores[method] for address in xrange(self.address+1, self.address+best_score): length, index = self.find_lookback(method, address) if length > max(min_score, best_score): # BUG: lookbacks can reduce themselves. This appears to be a bug in the target also. for m, score in self.scores.items(): self.scores[m] = min(score, address - self.address) def read(self, address=None): if address is None: address = self.address if 0 <= address < len(self.data): return self.data[address] return None def find_all_lookbacks(self): for method in self.lookback_methods: for address, byte in enumerate(self.data): self.find_lookback(method, address) def find_lookback(self, method, address=None): """Temporarily stubbed, because the real function doesn't run in polynomial time.""" return 0, None def broken_find_lookback(self, method, address=None): if address is None: address = self.address existing = self.lookbacks.get(method, {}).get(address) if existing != None: return existing lookback = 0, None # Better to not carelessly optimize at the moment. """ if address < 2: return lookback """ byte = self.read(address) if byte is None: return lookback direction, mutate = { 'repeat': ( 1, int), 'reverse': (-1, int), 'flip': ( 1, self.bit_flip), }[method] # Doesn't seem to help """ if mutate == self.bit_flip: if byte == 0: self.lookbacks[method][address] = lookback return lookback """ data_len = len(self.data) is_two_byte_index = lambda index: int(index < address - 0x7f) for index in self.get_indexes(mutate(byte)): if index >= address: break old_length, old_index = lookback if direction == 1: if old_length > data_len - index: break else: if old_length > index: continue if self.read(index) in [None]: continue length = 1 # we know there's at least one match, or we wouldn't be checking this index while 1: this_byte = self.read(address + length) that_byte = self.read(index + length * direction) if that_byte == None or this_byte != mutate(that_byte): break length += 1 score = length - is_two_byte_index(index) old_score = old_length - is_two_byte_index(old_index) if score >= old_score or (score == old_score and length > old_length): # XXX maybe avoid two-byte indexes when possible if score >= lookback[0] - is_two_byte_index(lookback[1]): lookback = length, index self.lookbacks[method][address] = lookback return lookback def get_indexes(self, byte): if not self.indexes.has_key(byte): self.indexes[byte] = [] index = -1 while 1: try: index = self.data.index(byte, index + 1) except ValueError: break self.indexes[byte].append(index) return self.indexes[byte] def score_literal(self, method): address = self.address compare = { 'blank': [0], 'iterate': [self.read(address)], 'alternate': [self.read(address), self.read(address + 1)], }[method] # XXX may or may not be correct if method == 'alternate' and compare[0] == 0: return length = 0 while self.read(address + length) == compare[length % len(compare)]: length += 1 self.scores[method] = length self.helpers[method] = compare def do_winner(self): winners = filter( lambda (method, score): score > self.min_scores[method] + int(score > lowmax), self.scores.iteritems() ) winners.sort( key = lambda (method, score): ( -(score - self.min_scores[method] - int(score > lowmax)), self.preference.index(method) ) ) winner, score = winners[0] length = min(score, max_length) self.do_cmd(winner, length) self.address += length def do_cmd(self, cmd, length): start_address = self.address cmd_length = length - 1 output = [] if length > lowmax: output.append( (self.commands['long'] << 5) + (self.commands[cmd] << 2) + (cmd_length >> 8) ) output.append( cmd_length & 0xff ) else: output.append( (self.commands[cmd] << 5) + cmd_length ) self.helpers['blank'] = [] # quick hack output += self.helpers.get(cmd, []) if cmd in self.lookback_methods: offset = self.offsets[cmd] # Negative offsets are one byte. # Positive offsets are two. if 0 < start_address - offset - 1 <= 0x7f: offset = (start_address - offset - 1) | 0x80 output += [offset] else: output += [offset / 0x100, offset % 0x100] # big endian if self.debug: print ' '.join(map(str, [ cmd, length, '\t', ' '.join(map('{:02x}'.format, output)), self.data[start_address:start_address+length] if cmd in self.lookback_methods else '', ])) self.output += output class Decompressed: """ Interpret and decompress lz-compressed data, usually 2bpp. """ """ Usage: data = Decompressed(lz).output or data = Decompressed().decompress(lz) or d = Decompressed() d.lz = lz data = d.decompress() To decompress from offset 0x80000 in a rom: data = Decompressed(rom, start=0x80000).output """ lz = None start = 0 commands = lz_commands debug = False arg_names = 'lz', 'start', 'commands', 'debug' def __init__(self, *args, **kwargs): self.__dict__.update(dict(zip(self.arg_names, args))) self.__dict__.update(kwargs) self.command_names = dict(map(reversed, self.commands.items())) self.address = self.start if self.lz is not None: self.decompress() if self.debug: print self.command_list() def command_list(self): """ Print a list of commands that were used. Useful for debugging. """ text = '' output_address = 0 for name, attrs in self.used_commands: length = attrs['length'] address = attrs['address'] offset = attrs['offset'] direction = attrs['direction'] text += '{2:03x} {0}: {1}'.format(name, length, output_address) text += '\t' + ' '.join( '{:02x}'.format(int(byte)) for byte in self.lz[ address : address + attrs['cmd_length'] ] ) if offset is not None: repeated_data = self.output[ offset : offset + length * direction : direction ] if name == 'flip': repeated_data = map(bit_flipped.__getitem__, repeated_data) text += ' [' + ' '.join(map('{:02x}'.format, repeated_data)) + ']' text += '\n' output_address += length return text def decompress(self, lz=None): if lz is not None: self.lz = lz self.lz = bytearray(self.lz) self.used_commands = [] self.output = [] while 1: cmd_address = self.address self.offset = None self.direction = None if (self.byte == lz_end): self.next() break self.cmd = (self.byte & 0b11100000) >> 5 if self.cmd_name == 'long': # 10-bit length self.cmd = (self.byte & 0b00011100) >> 2 self.length = (self.next() & 0b00000011) * 0x100 self.length += self.next() + 1 else: # 5-bit length self.length = (self.next() & 0b00011111) + 1 self.__class__.__dict__[self.cmd_name](self) self.used_commands += [( self.cmd_name, { 'length': self.length, 'address': cmd_address, 'offset': self.offset, 'cmd_length': self.address - cmd_address, 'direction': self.direction, } )] # Keep track of the data we just decompressed. self.compressed_data = self.lz[self.start : self.address] @property def byte(self): return self.lz[ self.address ] def next(self): byte = self.byte self.address += 1 return byte @property def cmd_name(self): return self.command_names.get(self.cmd) def get_offset(self): if self.byte >= 0x80: # negative # negative offset = self.next() & 0x7f offset = len(self.output) - offset - 1 else: # positive offset = self.next() * 0x100 offset += self.next() self.offset = offset def literal(self): """ Copy data directly. """ self.output += self.lz[ self.address : self.address + self.length ] self.address += self.length def iterate(self): """ Write one byte repeatedly. """ self.output += [self.next()] * self.length def alternate(self): """ Write alternating bytes. """ alts = [self.next(), self.next()] self.output += [ alts[x & 1] for x in xrange(self.length) ] def blank(self): """ Write zeros. """ self.output += [0] * self.length def flip(self): """ Repeat flipped bytes from output. Example: 11100100 -> 00100111 """ self._repeat(table=bit_flipped) def reverse(self): """ Repeat reversed bytes from output. """ self._repeat(direction=-1) def repeat(self): """ Repeat bytes from output. """ self._repeat() def _repeat(self, direction=1, table=None): self.get_offset() self.direction = direction # Note: appends must be one at a time (this way, repeats can draw from themselves if required) for i in xrange(self.length): byte = self.output[ self.offset + i * direction ] self.output.append( table[byte] if table else byte )