260 lines
9.5 KiB
Python
260 lines
9.5 KiB
Python
#!/usr/bin/env python3
|
|
#
|
|
# Tests for internal snapshot.
|
|
#
|
|
# Copyright (C) 2013 IBM, Inc.
|
|
#
|
|
# Based on 055.
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
|
|
import time
|
|
import os
|
|
import iotests
|
|
from iotests import qemu_img, qemu_io
|
|
|
|
test_drv_base_name = 'drive'
|
|
|
|
class ImageSnapshotTestCase(iotests.QMPTestCase):
|
|
image_len = 120 * 1024 * 1024 # MB
|
|
|
|
def __init__(self, *args):
|
|
self.expect = []
|
|
super(ImageSnapshotTestCase, self).__init__(*args)
|
|
|
|
def _setUp(self, test_img_base_name, image_num):
|
|
self.vm = iotests.VM()
|
|
for i in range(0, image_num):
|
|
filename = '%s%d' % (test_img_base_name, i)
|
|
img = os.path.join(iotests.test_dir, filename)
|
|
device = '%s%d' % (test_drv_base_name, i)
|
|
qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len))
|
|
self.vm.add_drive(img)
|
|
self.expect.append({'image': img, 'device': device,
|
|
'snapshots': [],
|
|
'snapshots_name_counter': 0})
|
|
self.vm.launch()
|
|
|
|
def tearDown(self):
|
|
self.vm.shutdown()
|
|
for dev_expect in self.expect:
|
|
os.remove(dev_expect['image'])
|
|
|
|
def createSnapshotInTransaction(self, snapshot_num, abort = False):
|
|
actions = []
|
|
for dev_expect in self.expect:
|
|
num = dev_expect['snapshots_name_counter']
|
|
for j in range(0, snapshot_num):
|
|
name = '%s_sn%d' % (dev_expect['device'], num)
|
|
num = num + 1
|
|
if abort == False:
|
|
dev_expect['snapshots'].append({'name': name})
|
|
dev_expect['snapshots_name_counter'] = num
|
|
actions.append({
|
|
'type': 'blockdev-snapshot-internal-sync',
|
|
'data': { 'device': dev_expect['device'],
|
|
'name': name },
|
|
})
|
|
|
|
if abort == True:
|
|
actions.append({
|
|
'type': 'abort',
|
|
'data': {},
|
|
})
|
|
|
|
result = self.vm.qmp('transaction', actions = actions)
|
|
|
|
if abort == True:
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
else:
|
|
self.assert_qmp(result, 'return', {})
|
|
|
|
def verifySnapshotInfo(self):
|
|
result = self.vm.qmp('query-block')
|
|
|
|
# Verify each expected result
|
|
for dev_expect in self.expect:
|
|
# 1. Find the returned image value and snapshot info
|
|
image_result = None
|
|
for device in result['return']:
|
|
if device['device'] == dev_expect['device']:
|
|
image_result = device['inserted']['image']
|
|
break
|
|
self.assertTrue(image_result != None)
|
|
# Do not consider zero snapshot case now
|
|
sn_list_result = image_result['snapshots']
|
|
sn_list_expect = dev_expect['snapshots']
|
|
|
|
# 2. Verify it with expect
|
|
self.assertTrue(len(sn_list_result) == len(sn_list_expect))
|
|
|
|
for sn_expect in sn_list_expect:
|
|
sn_result = None
|
|
for sn in sn_list_result:
|
|
if sn_expect['name'] == sn['name']:
|
|
sn_result = sn
|
|
break
|
|
self.assertTrue(sn_result != None)
|
|
# Fill in the detail info
|
|
sn_expect.update(sn_result)
|
|
|
|
def deleteSnapshot(self, device, id = None, name = None):
|
|
sn_list_expect = None
|
|
sn_expect = None
|
|
|
|
self.assertTrue(id != None or name != None)
|
|
|
|
# Fill in the detail info include ID
|
|
self.verifySnapshotInfo()
|
|
|
|
#find the expected snapshot list
|
|
for dev_expect in self.expect:
|
|
if dev_expect['device'] == device:
|
|
sn_list_expect = dev_expect['snapshots']
|
|
break
|
|
self.assertTrue(sn_list_expect != None)
|
|
|
|
if id != None and name != None:
|
|
for sn in sn_list_expect:
|
|
if sn['id'] == id and sn['name'] == name:
|
|
sn_expect = sn
|
|
result = \
|
|
self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = device,
|
|
id = id,
|
|
name = name)
|
|
break
|
|
elif id != None:
|
|
for sn in sn_list_expect:
|
|
if sn['id'] == id:
|
|
sn_expect = sn
|
|
result = \
|
|
self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = device,
|
|
id = id)
|
|
break
|
|
else:
|
|
for sn in sn_list_expect:
|
|
if sn['name'] == name:
|
|
sn_expect = sn
|
|
result = \
|
|
self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = device,
|
|
name = name)
|
|
break
|
|
|
|
self.assertTrue(sn_expect != None)
|
|
|
|
self.assert_qmp(result, 'return', sn_expect)
|
|
sn_list_expect.remove(sn_expect)
|
|
|
|
class TestSingleTransaction(ImageSnapshotTestCase):
|
|
def setUp(self):
|
|
self._setUp('test_a.img', 1)
|
|
|
|
def test_create(self):
|
|
self.createSnapshotInTransaction(1)
|
|
self.verifySnapshotInfo()
|
|
|
|
def test_error_name_empty(self):
|
|
actions = [{'type': 'blockdev-snapshot-internal-sync',
|
|
'data': { 'device': self.expect[0]['device'],
|
|
'name': '' },
|
|
}]
|
|
result = self.vm.qmp('transaction', actions = actions)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
def test_error_device(self):
|
|
actions = [{'type': 'blockdev-snapshot-internal-sync',
|
|
'data': { 'device': 'drive_error',
|
|
'name': 'a' },
|
|
}]
|
|
result = self.vm.qmp('transaction', actions = actions)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
def test_error_exist(self):
|
|
self.createSnapshotInTransaction(1)
|
|
self.verifySnapshotInfo()
|
|
actions = [{'type': 'blockdev-snapshot-internal-sync',
|
|
'data': { 'device': self.expect[0]['device'],
|
|
'name': self.expect[0]['snapshots'][0] },
|
|
}]
|
|
result = self.vm.qmp('transaction', actions = actions)
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
class TestMultipleTransaction(ImageSnapshotTestCase):
|
|
def setUp(self):
|
|
self._setUp('test_b.img', 2)
|
|
|
|
def test_create(self):
|
|
self.createSnapshotInTransaction(3)
|
|
self.verifySnapshotInfo()
|
|
|
|
def test_abort(self):
|
|
self.createSnapshotInTransaction(2)
|
|
self.verifySnapshotInfo()
|
|
self.createSnapshotInTransaction(3, abort = True)
|
|
self.verifySnapshotInfo()
|
|
|
|
class TestSnapshotDelete(ImageSnapshotTestCase):
|
|
def setUp(self):
|
|
self._setUp('test_c.img', 1)
|
|
|
|
def test_delete_with_id(self):
|
|
self.createSnapshotInTransaction(2)
|
|
self.verifySnapshotInfo()
|
|
self.deleteSnapshot(self.expect[0]['device'],
|
|
id = self.expect[0]['snapshots'][0]['id'])
|
|
self.verifySnapshotInfo()
|
|
|
|
def test_delete_with_name(self):
|
|
self.createSnapshotInTransaction(3)
|
|
self.verifySnapshotInfo()
|
|
self.deleteSnapshot(self.expect[0]['device'],
|
|
name = self.expect[0]['snapshots'][1]['name'])
|
|
self.verifySnapshotInfo()
|
|
|
|
def test_delete_with_id_and_name(self):
|
|
self.createSnapshotInTransaction(4)
|
|
self.verifySnapshotInfo()
|
|
self.deleteSnapshot(self.expect[0]['device'],
|
|
id = self.expect[0]['snapshots'][2]['id'],
|
|
name = self.expect[0]['snapshots'][2]['name'])
|
|
self.verifySnapshotInfo()
|
|
|
|
|
|
def test_error_device(self):
|
|
result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = 'drive_error',
|
|
id = '0')
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
def test_error_no_id_and_name(self):
|
|
result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = self.expect[0]['device'])
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
def test_error_snapshot_not_exist(self):
|
|
self.createSnapshotInTransaction(2)
|
|
self.verifySnapshotInfo()
|
|
result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
|
|
device = self.expect[0]['device'],
|
|
id = self.expect[0]['snapshots'][0]['id'],
|
|
name = self.expect[0]['snapshots'][1]['name'])
|
|
self.assert_qmp(result, 'error/class', 'GenericError')
|
|
|
|
if __name__ == '__main__':
|
|
iotests.main(supported_fmts=['qcow2'],
|
|
supported_protocols=['file'])
|