qemu/tests/qemu-iotests/qcow2_format.py
<<
>>
Prefs
   1# Library for manipulations with qcow2 image
   2#
   3# Copyright (c) 2020 Virtuozzo International GmbH.
   4# Copyright (C) 2012 Red Hat, Inc.
   5#
   6# This program is free software; you can redistribute it and/or modify
   7# it under the terms of the GNU General Public License as published by
   8# the Free Software Foundation; either version 2 of the License, or
   9# (at your option) any later version.
  10#
  11# This program is distributed in the hope that it will be useful,
  12# but WITHOUT ANY WARRANTY; without even the implied warranty of
  13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14# GNU General Public License for more details.
  15#
  16# You should have received a copy of the GNU General Public License
  17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
  18#
  19
  20import struct
  21import string
  22import json
  23
  24
  25class ComplexEncoder(json.JSONEncoder):
  26    def default(self, obj):
  27        if hasattr(obj, 'to_json'):
  28            return obj.to_json()
  29        else:
  30            return json.JSONEncoder.default(self, obj)
  31
  32
  33class Qcow2Field:
  34
  35    def __init__(self, value):
  36        self.value = value
  37
  38    def __str__(self):
  39        return str(self.value)
  40
  41
  42class Flags64(Qcow2Field):
  43
  44    def __str__(self):
  45        bits = []
  46        for bit in range(64):
  47            if self.value & (1 << bit):
  48                bits.append(bit)
  49        return str(bits)
  50
  51
  52class BitmapFlags(Qcow2Field):
  53
  54    flags = {
  55        0x1: 'in-use',
  56        0x2: 'auto'
  57    }
  58
  59    def __str__(self):
  60        bits = []
  61        for bit in range(64):
  62            flag = self.value & (1 << bit)
  63            if flag:
  64                bits.append(self.flags.get(flag, f'bit-{bit}'))
  65        return f'{self.value:#x} ({bits})'
  66
  67
  68class Enum(Qcow2Field):
  69
  70    def __str__(self):
  71        return f'{self.value:#x} ({self.mapping.get(self.value, "<unknown>")})'
  72
  73
  74class Qcow2StructMeta(type):
  75
  76    # Mapping from c types to python struct format
  77    ctypes = {
  78        'u8': 'B',
  79        'u16': 'H',
  80        'u32': 'I',
  81        'u64': 'Q'
  82    }
  83
  84    def __init__(self, name, bases, attrs):
  85        if 'fields' in attrs:
  86            self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
  87
  88
  89class Qcow2Struct(metaclass=Qcow2StructMeta):
  90
  91    """Qcow2Struct: base class for qcow2 data structures
  92
  93    Successors should define fields class variable, which is: list of tuples,
  94    each of three elements:
  95        - c-type (one of 'u8', 'u16', 'u32', 'u64')
  96        - format (format_spec to use with .format() when dump or 'mask' to dump
  97                  bitmasks)
  98        - field name
  99    """
 100
 101    def __init__(self, fd=None, offset=None, data=None):
 102        """
 103        Two variants:
 104            1. Specify data. fd and offset must be None.
 105            2. Specify fd and offset, data must be None. offset may be omitted
 106               in this case, than current position of fd is used.
 107        """
 108        if data is None:
 109            assert fd is not None
 110            buf_size = struct.calcsize(self.fmt)
 111            if offset is not None:
 112                fd.seek(offset)
 113            data = fd.read(buf_size)
 114        else:
 115            assert fd is None and offset is None
 116
 117        values = struct.unpack(self.fmt, data)
 118        self.__dict__ = dict((field[2], values[i])
 119                             for i, field in enumerate(self.fields))
 120
 121    def dump(self, is_json=False):
 122        if is_json:
 123            print(json.dumps(self.to_json(), indent=4, cls=ComplexEncoder))
 124            return
 125
 126        for f in self.fields:
 127            value = self.__dict__[f[2]]
 128            if isinstance(f[1], str):
 129                value_str = f[1].format(value)
 130            else:
 131                value_str = str(f[1](value))
 132
 133            print('{:<25} {}'.format(f[2], value_str))
 134
 135    def to_json(self):
 136        return dict((f[2], self.__dict__[f[2]]) for f in self.fields)
 137
 138
 139class Qcow2BitmapExt(Qcow2Struct):
 140
 141    fields = (
 142        ('u32', '{}', 'nb_bitmaps'),
 143        ('u32', '{}', 'reserved32'),
 144        ('u64', '{:#x}', 'bitmap_directory_size'),
 145        ('u64', '{:#x}', 'bitmap_directory_offset')
 146    )
 147
 148    def __init__(self, fd, cluster_size):
 149        super().__init__(fd=fd)
 150        tail = struct.calcsize(self.fmt) % 8
 151        if tail:
 152            fd.seek(8 - tail, 1)
 153        position = fd.tell()
 154        self.cluster_size = cluster_size
 155        self.read_bitmap_directory(fd)
 156        fd.seek(position)
 157
 158    def read_bitmap_directory(self, fd):
 159        fd.seek(self.bitmap_directory_offset)
 160        self.bitmap_directory = \
 161            [Qcow2BitmapDirEntry(fd, cluster_size=self.cluster_size)
 162             for _ in range(self.nb_bitmaps)]
 163
 164    def dump(self):
 165        super().dump()
 166        for entry in self.bitmap_directory:
 167            print()
 168            entry.dump()
 169
 170    def to_json(self):
 171        fields_dict = super().to_json()
 172        fields_dict['bitmap_directory'] = self.bitmap_directory
 173        return fields_dict
 174
 175
 176class Qcow2BitmapDirEntry(Qcow2Struct):
 177
 178    fields = (
 179        ('u64', '{:#x}', 'bitmap_table_offset'),
 180        ('u32', '{}', 'bitmap_table_size'),
 181        ('u32', BitmapFlags, 'flags'),
 182        ('u8',  '{}', 'type'),
 183        ('u8',  '{}', 'granularity_bits'),
 184        ('u16', '{}', 'name_size'),
 185        ('u32', '{}', 'extra_data_size')
 186    )
 187
 188    def __init__(self, fd, cluster_size):
 189        super().__init__(fd=fd)
 190        self.cluster_size = cluster_size
 191        # Seek relative to the current position in the file
 192        fd.seek(self.extra_data_size, 1)
 193        bitmap_name = fd.read(self.name_size)
 194        self.name = bitmap_name.decode('ascii')
 195        # Move position to the end of the entry in the directory
 196        entry_raw_size = self.bitmap_dir_entry_raw_size()
 197        padding = ((entry_raw_size + 7) & ~7) - entry_raw_size
 198        fd.seek(padding, 1)
 199        self.bitmap_table = Qcow2BitmapTable(fd=fd,
 200                                             offset=self.bitmap_table_offset,
 201                                             nb_entries=self.bitmap_table_size,
 202                                             cluster_size=self.cluster_size)
 203
 204    def bitmap_dir_entry_raw_size(self):
 205        return struct.calcsize(self.fmt) + self.name_size + \
 206            self.extra_data_size
 207
 208    def dump(self):
 209        print(f'{"Bitmap name":<25} {self.name}')
 210        super(Qcow2BitmapDirEntry, self).dump()
 211        self.bitmap_table.dump()
 212
 213    def to_json(self):
 214        # Put the name ahead of the dict
 215        return {
 216            'name': self.name,
 217            **super().to_json(),
 218            'bitmap_table': self.bitmap_table
 219        }
 220
 221
 222class Qcow2BitmapTableEntry(Qcow2Struct):
 223
 224    fields = (
 225        ('u64',  '{}', 'entry'),
 226    )
 227
 228    BME_TABLE_ENTRY_RESERVED_MASK = 0xff000000000001fe
 229    BME_TABLE_ENTRY_OFFSET_MASK = 0x00fffffffffffe00
 230    BME_TABLE_ENTRY_FLAG_ALL_ONES = 1
 231
 232    def __init__(self, fd):
 233        super().__init__(fd=fd)
 234        self.reserved = self.entry & self.BME_TABLE_ENTRY_RESERVED_MASK
 235        self.offset = self.entry & self.BME_TABLE_ENTRY_OFFSET_MASK
 236        if self.offset:
 237            if self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
 238                self.type = 'invalid'
 239            else:
 240                self.type = 'serialized'
 241        elif self.entry & self.BME_TABLE_ENTRY_FLAG_ALL_ONES:
 242            self.type = 'all-ones'
 243        else:
 244            self.type = 'all-zeroes'
 245
 246    def to_json(self):
 247        return {'type': self.type, 'offset': self.offset,
 248                'reserved': self.reserved}
 249
 250
 251class Qcow2BitmapTable:
 252
 253    def __init__(self, fd, offset, nb_entries, cluster_size):
 254        self.cluster_size = cluster_size
 255        position = fd.tell()
 256        fd.seek(offset)
 257        self.entries = [Qcow2BitmapTableEntry(fd) for _ in range(nb_entries)]
 258        fd.seek(position)
 259
 260    def dump(self):
 261        bitmap_table = enumerate(self.entries)
 262        print(f'{"Bitmap table":<14} {"type":<15} {"size":<12} {"offset"}')
 263        for i, entry in bitmap_table:
 264            if entry.type == 'serialized':
 265                size = self.cluster_size
 266            else:
 267                size = 0
 268            print(f'{i:<14} {entry.type:<15} {size:<12} {entry.offset}')
 269
 270    def to_json(self):
 271        return self.entries
 272
 273
 274QCOW2_EXT_MAGIC_BITMAPS = 0x23852875
 275
 276
 277class QcowHeaderExtension(Qcow2Struct):
 278
 279    class Magic(Enum):
 280        mapping = {
 281            0xe2792aca: 'Backing format',
 282            0x6803f857: 'Feature table',
 283            0x0537be77: 'Crypto header',
 284            QCOW2_EXT_MAGIC_BITMAPS: 'Bitmaps',
 285            0x44415441: 'Data file'
 286        }
 287
 288        def to_json(self):
 289            return self.mapping.get(self.value, "<unknown>")
 290
 291    fields = (
 292        ('u32', Magic, 'magic'),
 293        ('u32', '{}', 'length')
 294        # length bytes of data follows
 295        # then padding to next multiply of 8
 296    )
 297
 298    def __init__(self, magic=None, length=None, data=None, fd=None,
 299                 cluster_size=None):
 300        """
 301        Support both loading from fd and creation from user data.
 302        For fd-based creation current position in a file will be used to read
 303        the data.
 304        The cluster_size value may be obtained by dependent structures.
 305
 306        This should be somehow refactored and functionality should be moved to
 307        superclass (to allow creation of any qcow2 struct), but then, fields
 308        of variable length (data here) should be supported in base class
 309        somehow. Note also, that we probably want to parse different
 310        extensions. Should they be subclasses of this class, or how to do it
 311        better? Should it be something like QAPI union with discriminator field
 312        (magic here). So, it's a TODO. We'll see how to properly refactor this
 313        when we have more qcow2 structures.
 314        """
 315        if fd is None:
 316            assert all(v is not None for v in (magic, length, data))
 317            self.magic = magic
 318            self.length = length
 319            if length % 8 != 0:
 320                padding = 8 - (length % 8)
 321                data += b'\0' * padding
 322            self.data = data
 323        else:
 324            assert all(v is None for v in (magic, length, data))
 325            super().__init__(fd=fd)
 326            if self.magic == QCOW2_EXT_MAGIC_BITMAPS:
 327                self.obj = Qcow2BitmapExt(fd=fd, cluster_size=cluster_size)
 328                self.data = None
 329            else:
 330                padded = (self.length + 7) & ~7
 331                self.data = fd.read(padded)
 332                assert self.data is not None
 333                self.obj = None
 334
 335        if self.data is not None:
 336            data_str = self.data[:self.length]
 337            if all(c in string.printable.encode(
 338                'ascii') for c in data_str):
 339                data_str = f"'{ data_str.decode('ascii') }'"
 340            else:
 341                data_str = '<binary>'
 342            self.data_str = data_str
 343
 344
 345    def dump(self):
 346        super().dump()
 347
 348        if self.obj is None:
 349            print(f'{"data":<25} {self.data_str}')
 350        else:
 351            self.obj.dump()
 352
 353    def to_json(self):
 354        # Put the name ahead of the dict
 355        res = {'name': self.Magic(self.magic), **super().to_json()}
 356        if self.obj is not None:
 357            res['data'] = self.obj
 358        else:
 359            res['data_str'] = self.data_str
 360
 361        return res
 362
 363    @classmethod
 364    def create(cls, magic, data):
 365        return QcowHeaderExtension(magic, len(data), data)
 366
 367
 368class QcowHeader(Qcow2Struct):
 369
 370    fields = (
 371        # Version 2 header fields
 372        ('u32', '{:#x}', 'magic'),
 373        ('u32', '{}', 'version'),
 374        ('u64', '{:#x}', 'backing_file_offset'),
 375        ('u32', '{:#x}', 'backing_file_size'),
 376        ('u32', '{}', 'cluster_bits'),
 377        ('u64', '{}', 'size'),
 378        ('u32', '{}', 'crypt_method'),
 379        ('u32', '{}', 'l1_size'),
 380        ('u64', '{:#x}', 'l1_table_offset'),
 381        ('u64', '{:#x}', 'refcount_table_offset'),
 382        ('u32', '{}', 'refcount_table_clusters'),
 383        ('u32', '{}', 'nb_snapshots'),
 384        ('u64', '{:#x}', 'snapshot_offset'),
 385
 386        # Version 3 header fields
 387        ('u64', Flags64, 'incompatible_features'),
 388        ('u64', Flags64, 'compatible_features'),
 389        ('u64', Flags64, 'autoclear_features'),
 390        ('u32', '{}', 'refcount_order'),
 391        ('u32', '{}', 'header_length'),
 392    )
 393
 394    def __init__(self, fd):
 395        super().__init__(fd=fd, offset=0)
 396
 397        self.set_defaults()
 398        self.cluster_size = 1 << self.cluster_bits
 399
 400        fd.seek(self.header_length)
 401        self.load_extensions(fd)
 402
 403        if self.backing_file_offset:
 404            fd.seek(self.backing_file_offset)
 405            self.backing_file = fd.read(self.backing_file_size)
 406        else:
 407            self.backing_file = None
 408
 409    def set_defaults(self):
 410        if self.version == 2:
 411            self.incompatible_features = 0
 412            self.compatible_features = 0
 413            self.autoclear_features = 0
 414            self.refcount_order = 4
 415            self.header_length = 72
 416
 417    def load_extensions(self, fd):
 418        self.extensions = []
 419
 420        if self.backing_file_offset != 0:
 421            end = min(self.cluster_size, self.backing_file_offset)
 422        else:
 423            end = self.cluster_size
 424
 425        while fd.tell() < end:
 426            ext = QcowHeaderExtension(fd=fd, cluster_size=self.cluster_size)
 427            if ext.magic == 0:
 428                break
 429            else:
 430                self.extensions.append(ext)
 431
 432    def update_extensions(self, fd):
 433
 434        fd.seek(self.header_length)
 435        extensions = self.extensions
 436        extensions.append(QcowHeaderExtension(0, 0, b''))
 437        for ex in extensions:
 438            buf = struct.pack('>II', ex.magic, ex.length)
 439            fd.write(buf)
 440            fd.write(ex.data)
 441
 442        if self.backing_file is not None:
 443            self.backing_file_offset = fd.tell()
 444            fd.write(self.backing_file)
 445
 446        if fd.tell() > self.cluster_size:
 447            raise Exception('I think I just broke the image...')
 448
 449    def update(self, fd):
 450        header_bytes = self.header_length
 451
 452        self.update_extensions(fd)
 453
 454        fd.seek(0)
 455        header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
 456        buf = struct.pack(QcowHeader.fmt, *header)
 457        buf = buf[0:header_bytes-1]
 458        fd.write(buf)
 459
 460    def dump_extensions(self, is_json=False):
 461        if is_json:
 462            print(json.dumps(self.extensions, indent=4, cls=ComplexEncoder))
 463            return
 464
 465        for ex in self.extensions:
 466            print('Header extension:')
 467            ex.dump()
 468            print()
 469