qemu/tests/qemu-iotests/295
<<
>>
Prefs
   1#!/usr/bin/env python3
   2# group: rw
   3#
   4# Test case QMP's encrypted key management
   5#
   6# Copyright (C) 2019 Red Hat, Inc.
   7#
   8# This program is free software; you can redistribute it and/or modify
   9# it under the terms of the GNU General Public License as published by
  10# the Free Software Foundation; either version 2 of the License, or
  11# (at your option) any later version.
  12#
  13# This program is distributed in the hope that it will be useful,
  14# but WITHOUT ANY WARRANTY; without even the implied warranty of
  15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16# GNU General Public License for more details.
  17#
  18# You should have received a copy of the GNU General Public License
  19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
  20#
  21
  22import iotests
  23import os
  24import time
  25import json
  26
  27test_img = os.path.join(iotests.test_dir, 'test.img')
  28
  29class Secret:
  30    def __init__(self, index):
  31        self._id = "keysec" + str(index)
  32        # you are not supposed to see the password...
  33        self._secret = "hunter" + str(index)
  34
  35    def id(self):
  36        return self._id
  37
  38    def secret(self):
  39        return self._secret
  40
  41    def to_cmdline_object(self):
  42        return  [ "secret,id=" + self._id + ",data=" + self._secret]
  43
  44    def to_qmp_object(self):
  45        return { "qom_type" : "secret", "id": self.id(),
  46                 "data": self.secret() }
  47
  48################################################################################
  49class EncryptionSetupTestCase(iotests.QMPTestCase):
  50
  51    # test case startup
  52    def setUp(self):
  53        # start the VM
  54        self.vm = iotests.VM()
  55        self.vm.launch()
  56
  57        # create the secrets and load 'em into the VM
  58        self.secrets = [ Secret(i) for i in range(0, 6) ]
  59        for secret in self.secrets:
  60            result = self.vm.qmp("object-add", **secret.to_qmp_object())
  61            self.assert_qmp(result, 'return', {})
  62
  63        if iotests.imgfmt == "qcow2":
  64            self.pfx = "encrypt."
  65            self.img_opts = [ '-o', "encrypt.format=luks" ]
  66        else:
  67            self.pfx = ""
  68            self.img_opts = []
  69
  70    # test case shutdown
  71    def tearDown(self):
  72        # stop the VM
  73        self.vm.shutdown()
  74
  75    ###########################################################################
  76    # create the encrypted block device
  77    def createImg(self, file, secret):
  78
  79        iotests.qemu_img(
  80            'create',
  81            '--object', *secret.to_cmdline_object(),
  82            '-f', iotests.imgfmt,
  83            '-o', self.pfx + 'key-secret=' + secret.id(),
  84            '-o', self.pfx + 'iter-time=10',
  85            *self.img_opts,
  86            file,
  87            '1M')
  88
  89    ###########################################################################
  90    # open an encrypted block device
  91    def openImageQmp(self, id, file, secret, read_only = False):
  92
  93        encrypt_options = {
  94            'key-secret' : secret.id()
  95        }
  96
  97        if iotests.imgfmt == "qcow2":
  98            encrypt_options = {
  99                'encrypt': {
 100                    'format':'luks',
 101                    **encrypt_options
 102                }
 103            }
 104
 105        result = self.vm.qmp('blockdev-add', **
 106            {
 107                'driver': iotests.imgfmt,
 108                'node-name': id,
 109                'read-only': read_only,
 110
 111                **encrypt_options,
 112
 113                'file': {
 114                    'driver': 'file',
 115                    'filename': test_img,
 116                }
 117            }
 118        )
 119        self.assert_qmp(result, 'return', {})
 120
 121    # close the encrypted block device
 122    def closeImageQmp(self, id):
 123        result = self.vm.qmp('blockdev-del', **{ 'node-name': id })
 124        self.assert_qmp(result, 'return', {})
 125
 126    ###########################################################################
 127    # add a key to an encrypted block device
 128    def addKeyQmp(self, id, new_secret, secret = None,
 129                  slot = None, force = False):
 130
 131        crypt_options = {
 132            'state'      : 'active',
 133            'new-secret' : new_secret.id(),
 134            'iter-time' : 10
 135        }
 136
 137        if slot != None:
 138            crypt_options['keyslot'] = slot
 139
 140
 141        if secret != None:
 142            crypt_options['secret'] = secret.id()
 143
 144        if iotests.imgfmt == "qcow2":
 145            crypt_options['format'] = 'luks'
 146            crypt_options = {
 147                'encrypt': crypt_options
 148            }
 149
 150        args = {
 151            'node-name': id,
 152            'job-id' : 'job_add_key',
 153            'options' : {
 154                    'driver' : iotests.imgfmt,
 155                    **crypt_options
 156                },
 157        }
 158
 159        if force == True:
 160            args['force'] = True
 161
 162        #TODO: check what jobs return
 163        result = self.vm.qmp('x-blockdev-amend', **args)
 164        assert result['return'] == {}
 165        self.vm.run_job('job_add_key')
 166
 167    # erase a key from an encrypted block device
 168    def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False):
 169
 170        crypt_options = {
 171            'state'      : 'inactive',
 172        }
 173
 174        if slot != None:
 175            crypt_options['keyslot'] = slot
 176        if old_secret != None:
 177            crypt_options['old-secret'] = old_secret.id()
 178
 179        if iotests.imgfmt == "qcow2":
 180            crypt_options['format'] = 'luks'
 181            crypt_options = {
 182                'encrypt': crypt_options
 183            }
 184
 185        args = {
 186            'node-name': id,
 187            'job-id' : 'job_erase_key',
 188            'options' : {
 189                    'driver' : iotests.imgfmt,
 190                    **crypt_options
 191                },
 192        }
 193
 194        if force == True:
 195            args['force'] = True
 196
 197        result = self.vm.qmp('x-blockdev-amend', **args)
 198        assert result['return'] == {}
 199        self.vm.run_job('job_erase_key')
 200
 201    ###########################################################################
 202    # create image, and change its key
 203    def testChangeKey(self):
 204
 205        # create the image with secret0 and open it
 206        self.createImg(test_img, self.secrets[0]);
 207        self.openImageQmp("testdev", test_img, self.secrets[0])
 208
 209        # add key to slot 1
 210        self.addKeyQmp("testdev", new_secret = self.secrets[1])
 211
 212        # add key to slot 5
 213        self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5)
 214
 215        # erase key from slot 0
 216        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
 217
 218        #reopen the image with secret1
 219        self.closeImageQmp("testdev")
 220        self.openImageQmp("testdev", test_img, self.secrets[1])
 221
 222        # close and erase the image for good
 223        self.closeImageQmp("testdev")
 224        os.remove(test_img)
 225
 226    # test that if we erase the old password,
 227    # we can still change the encryption keys using 'old-secret'
 228    def testOldPassword(self):
 229
 230        # create the image with secret0 and open it
 231        self.createImg(test_img, self.secrets[0]);
 232        self.openImageQmp("testdev", test_img, self.secrets[0])
 233
 234        # add key to slot 1
 235        self.addKeyQmp("testdev", new_secret = self.secrets[1])
 236
 237        # erase key from slot 0
 238        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
 239
 240        # this will fail as the old password is no longer valid
 241        self.addKeyQmp("testdev", new_secret = self.secrets[2])
 242
 243        # this will work
 244        self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1])
 245
 246        # close and erase the image for good
 247        self.closeImageQmp("testdev")
 248        os.remove(test_img)
 249
 250    def testUseForceLuke(self):
 251
 252        self.createImg(test_img, self.secrets[0]);
 253        self.openImageQmp("testdev", test_img, self.secrets[0])
 254
 255        # Add bunch of secrets
 256        self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4)
 257        self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2)
 258
 259        # overwrite an active secret
 260        self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2)
 261        self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True)
 262
 263        self.addKeyQmp("testdev", new_secret = self.secrets[0])
 264
 265        # Now erase all the secrets
 266        self.eraseKeyQmp("testdev", old_secret = self.secrets[5])
 267        self.eraseKeyQmp("testdev", slot=4)
 268
 269        # erase last keyslot
 270        self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
 271        self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True)
 272
 273        self.closeImageQmp("testdev")
 274        os.remove(test_img)
 275
 276
 277if __name__ == '__main__':
 278    iotests.verify_working_luks()
 279    # Encrypted formats support
 280    iotests.activate_logging()
 281    iotests.main(supported_fmts = ['qcow2', 'luks'])
 282