qemu/tests/migration/guestperf/engine.py
<<
>>
Prefs
   1#
   2# Migration test main engine
   3#
   4# Copyright (c) 2016 Red Hat, Inc.
   5#
   6# This library is free software; you can redistribute it and/or
   7# modify it under the terms of the GNU Lesser General Public
   8# License as published by the Free Software Foundation; either
   9# version 2.1 of the License, or (at your option) any later version.
  10#
  11# This library is distributed in the hope that it will be useful,
  12# but WITHOUT ANY WARRANTY; without even the implied warranty of
  13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  14# Lesser General Public License for more details.
  15#
  16# You should have received a copy of the GNU Lesser General Public
  17# License along with this library; if not, see <http://www.gnu.org/licenses/>.
  18#
  19
  20
  21import os
  22import re
  23import sys
  24import time
  25
  26from guestperf.progress import Progress, ProgressStats
  27from guestperf.report import Report
  28from guestperf.timings import TimingRecord, Timings
  29
  30sys.path.append(os.path.join(os.path.dirname(__file__),
  31                             '..', '..', '..', 'python'))
  32from qemu.machine import QEMUMachine
  33
  34
  35class Engine(object):
  36
  37    def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
  38                 sleep=15, verbose=False, debug=False):
  39
  40        self._binary = binary # Path to QEMU binary
  41        self._dst_host = dst_host # Hostname of target host
  42        self._kernel = kernel # Path to kernel image
  43        self._initrd = initrd # Path to stress initrd
  44        self._transport = transport # 'unix' or 'tcp' or 'rdma'
  45        self._sleep = sleep
  46        self._verbose = verbose
  47        self._debug = debug
  48
  49        if debug:
  50            self._verbose = debug
  51
  52    def _vcpu_timing(self, pid, tid_list):
  53        records = []
  54        now = time.time()
  55
  56        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  57        for tid in tid_list:
  58            statfile = "/proc/%d/task/%d/stat" % (pid, tid)
  59            with open(statfile, "r") as fh:
  60                stat = fh.readline()
  61                fields = stat.split(" ")
  62                stime = int(fields[13])
  63                utime = int(fields[14])
  64                records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
  65        return records
  66
  67    def _cpu_timing(self, pid):
  68        records = []
  69        now = time.time()
  70
  71        jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
  72        statfile = "/proc/%d/stat" % pid
  73        with open(statfile, "r") as fh:
  74            stat = fh.readline()
  75            fields = stat.split(" ")
  76            stime = int(fields[13])
  77            utime = int(fields[14])
  78            return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
  79
  80    def _migrate_progress(self, vm):
  81        info = vm.command("query-migrate")
  82
  83        if "ram" not in info:
  84            info["ram"] = {}
  85
  86        return Progress(
  87            info.get("status", "active"),
  88            ProgressStats(
  89                info["ram"].get("transferred", 0),
  90                info["ram"].get("remaining", 0),
  91                info["ram"].get("total", 0),
  92                info["ram"].get("duplicate", 0),
  93                info["ram"].get("skipped", 0),
  94                info["ram"].get("normal", 0),
  95                info["ram"].get("normal-bytes", 0),
  96                info["ram"].get("dirty-pages-rate", 0),
  97                info["ram"].get("mbps", 0),
  98                info["ram"].get("dirty-sync-count", 0)
  99            ),
 100            time.time(),
 101            info.get("total-time", 0),
 102            info.get("downtime", 0),
 103            info.get("expected-downtime", 0),
 104            info.get("setup-time", 0),
 105            info.get("cpu-throttle-percentage", 0),
 106        )
 107
 108    def _migrate(self, hardware, scenario, src, dst, connect_uri):
 109        src_qemu_time = []
 110        src_vcpu_time = []
 111        src_pid = src.get_pid()
 112
 113        vcpus = src.command("query-cpus-fast")
 114        src_threads = []
 115        for vcpu in vcpus:
 116            src_threads.append(vcpu["thread-id"])
 117
 118        # XXX how to get dst timings on remote host ?
 119
 120        if self._verbose:
 121            print("Sleeping %d seconds for initial guest workload run" % self._sleep)
 122        sleep_secs = self._sleep
 123        while sleep_secs > 1:
 124            src_qemu_time.append(self._cpu_timing(src_pid))
 125            src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
 126            time.sleep(1)
 127            sleep_secs -= 1
 128
 129        if self._verbose:
 130            print("Starting migration")
 131        if scenario._auto_converge:
 132            resp = src.command("migrate-set-capabilities",
 133                               capabilities = [
 134                                   { "capability": "auto-converge",
 135                                     "state": True }
 136                               ])
 137            resp = src.command("migrate-set-parameters",
 138                               cpu_throttle_increment=scenario._auto_converge_step)
 139
 140        if scenario._post_copy:
 141            resp = src.command("migrate-set-capabilities",
 142                               capabilities = [
 143                                   { "capability": "postcopy-ram",
 144                                     "state": True }
 145                               ])
 146            resp = dst.command("migrate-set-capabilities",
 147                               capabilities = [
 148                                   { "capability": "postcopy-ram",
 149                                     "state": True }
 150                               ])
 151
 152        resp = src.command("migrate-set-parameters",
 153                           max_bandwidth=scenario._bandwidth * 1024 * 1024)
 154
 155        resp = src.command("migrate-set-parameters",
 156                           downtime_limit=scenario._downtime)
 157
 158        if scenario._compression_mt:
 159            resp = src.command("migrate-set-capabilities",
 160                               capabilities = [
 161                                   { "capability": "compress",
 162                                     "state": True }
 163                               ])
 164            resp = src.command("migrate-set-parameters",
 165                               compress_threads=scenario._compression_mt_threads)
 166            resp = dst.command("migrate-set-capabilities",
 167                               capabilities = [
 168                                   { "capability": "compress",
 169                                     "state": True }
 170                               ])
 171            resp = dst.command("migrate-set-parameters",
 172                               decompress_threads=scenario._compression_mt_threads)
 173
 174        if scenario._compression_xbzrle:
 175            resp = src.command("migrate-set-capabilities",
 176                               capabilities = [
 177                                   { "capability": "xbzrle",
 178                                     "state": True }
 179                               ])
 180            resp = dst.command("migrate-set-capabilities",
 181                               capabilities = [
 182                                   { "capability": "xbzrle",
 183                                     "state": True }
 184                               ])
 185            resp = src.command("migrate-set-parameters",
 186                               xbzrle_cache_size=(
 187                                   hardware._mem *
 188                                   1024 * 1024 * 1024 / 100 *
 189                                   scenario._compression_xbzrle_cache))
 190
 191        if scenario._multifd:
 192            resp = src.command("migrate-set-capabilities",
 193                               capabilities = [
 194                                   { "capability": "multifd",
 195                                     "state": True }
 196                               ])
 197            resp = src.command("migrate-set-parameters",
 198                               multifd_channels=scenario._multifd_channels)
 199            resp = dst.command("migrate-set-capabilities",
 200                               capabilities = [
 201                                   { "capability": "multifd",
 202                                     "state": True }
 203                               ])
 204            resp = dst.command("migrate-set-parameters",
 205                               multifd_channels=scenario._multifd_channels)
 206
 207        resp = src.command("migrate", uri=connect_uri)
 208
 209        post_copy = False
 210        paused = False
 211
 212        progress_history = []
 213
 214        start = time.time()
 215        loop = 0
 216        while True:
 217            loop = loop + 1
 218            time.sleep(0.05)
 219
 220            progress = self._migrate_progress(src)
 221            if (loop % 20) == 0:
 222                src_qemu_time.append(self._cpu_timing(src_pid))
 223                src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
 224
 225            if (len(progress_history) == 0 or
 226                (progress_history[-1]._ram._iterations <
 227                 progress._ram._iterations)):
 228                progress_history.append(progress)
 229
 230            if progress._status in ("completed", "failed", "cancelled"):
 231                if progress._status == "completed" and paused:
 232                    dst.command("cont")
 233                if progress_history[-1] != progress:
 234                    progress_history.append(progress)
 235
 236                if progress._status == "completed":
 237                    if self._verbose:
 238                        print("Sleeping %d seconds for final guest workload run" % self._sleep)
 239                    sleep_secs = self._sleep
 240                    while sleep_secs > 1:
 241                        time.sleep(1)
 242                        src_qemu_time.append(self._cpu_timing(src_pid))
 243                        src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
 244                        sleep_secs -= 1
 245
 246                return [progress_history, src_qemu_time, src_vcpu_time]
 247
 248            if self._verbose and (loop % 20) == 0:
 249                print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
 250                    progress._ram._iterations,
 251                    progress._ram._remaining_bytes / (1024 * 1024),
 252                    progress._ram._total_bytes / (1024 * 1024),
 253                    progress._ram._transferred_bytes / (1024 * 1024),
 254                    progress._ram._transfer_rate_mbs,
 255                ))
 256
 257            if progress._ram._iterations > scenario._max_iters:
 258                if self._verbose:
 259                    print("No completion after %d iterations over RAM" % scenario._max_iters)
 260                src.command("migrate_cancel")
 261                continue
 262
 263            if time.time() > (start + scenario._max_time):
 264                if self._verbose:
 265                    print("No completion after %d seconds" % scenario._max_time)
 266                src.command("migrate_cancel")
 267                continue
 268
 269            if (scenario._post_copy and
 270                progress._ram._iterations >= scenario._post_copy_iters and
 271                not post_copy):
 272                if self._verbose:
 273                    print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
 274                resp = src.command("migrate-start-postcopy")
 275                post_copy = True
 276
 277            if (scenario._pause and
 278                progress._ram._iterations >= scenario._pause_iters and
 279                not paused):
 280                if self._verbose:
 281                    print("Pausing VM after %d iterations" % scenario._pause_iters)
 282                resp = src.command("stop")
 283                paused = True
 284
 285    def _get_common_args(self, hardware, tunnelled=False):
 286        args = [
 287            "noapic",
 288            "edd=off",
 289            "printk.time=1",
 290            "noreplace-smp",
 291            "cgroup_disable=memory",
 292            "pci=noearly",
 293            "console=ttyS0",
 294        ]
 295        if self._debug:
 296            args.append("debug")
 297        else:
 298            args.append("quiet")
 299
 300        args.append("ramsize=%s" % hardware._mem)
 301
 302        cmdline = " ".join(args)
 303        if tunnelled:
 304            cmdline = "'" + cmdline + "'"
 305
 306        argv = [
 307            "-accel", "kvm",
 308            "-cpu", "host",
 309            "-kernel", self._kernel,
 310            "-initrd", self._initrd,
 311            "-append", cmdline,
 312            "-chardev", "stdio,id=cdev0",
 313            "-device", "isa-serial,chardev=cdev0",
 314            "-m", str((hardware._mem * 1024) + 512),
 315            "-smp", str(hardware._cpus),
 316        ]
 317
 318        if self._debug:
 319            argv.extend(["-device", "sga"])
 320
 321        if hardware._prealloc_pages:
 322            argv_source += ["-mem-path", "/dev/shm",
 323                            "-mem-prealloc"]
 324        if hardware._locked_pages:
 325            argv_source += ["-overcommit", "mem-lock=on"]
 326        if hardware._huge_pages:
 327            pass
 328
 329        return argv
 330
 331    def _get_src_args(self, hardware):
 332        return self._get_common_args(hardware)
 333
 334    def _get_dst_args(self, hardware, uri):
 335        tunnelled = False
 336        if self._dst_host != "localhost":
 337            tunnelled = True
 338        argv = self._get_common_args(hardware, tunnelled)
 339        return argv + ["-incoming", uri]
 340
 341    @staticmethod
 342    def _get_common_wrapper(cpu_bind, mem_bind):
 343        wrapper = []
 344        if len(cpu_bind) > 0 or len(mem_bind) > 0:
 345            wrapper.append("numactl")
 346            if cpu_bind:
 347                wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
 348            if mem_bind:
 349                wrapper.append("--membind=%s" % ",".join(mem_bind))
 350
 351        return wrapper
 352
 353    def _get_src_wrapper(self, hardware):
 354        return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
 355
 356    def _get_dst_wrapper(self, hardware):
 357        wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
 358        if self._dst_host != "localhost":
 359            return ["ssh",
 360                    "-R", "9001:localhost:9001",
 361                    self._dst_host] + wrapper
 362        else:
 363            return wrapper
 364
 365    def _get_timings(self, vm):
 366        log = vm.get_log()
 367        if not log:
 368            return []
 369        if self._debug:
 370            print(log)
 371
 372        regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
 373        matcher = re.compile(regex)
 374        records = []
 375        for line in log.split("\n"):
 376            match = matcher.match(line)
 377            if match:
 378                records.append(TimingRecord(int(match.group(1)),
 379                                            int(match.group(2)) / 1000.0,
 380                                            int(match.group(3))))
 381        return records
 382
 383    def run(self, hardware, scenario, result_dir=os.getcwd()):
 384        abs_result_dir = os.path.join(result_dir, scenario._name)
 385
 386        if self._transport == "tcp":
 387            uri = "tcp:%s:9000" % self._dst_host
 388        elif self._transport == "rdma":
 389            uri = "rdma:%s:9000" % self._dst_host
 390        elif self._transport == "unix":
 391            if self._dst_host != "localhost":
 392                raise Exception("Running use unix migration transport for non-local host")
 393            uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
 394            try:
 395                os.remove(uri[5:])
 396                os.remove(monaddr)
 397            except:
 398                pass
 399
 400        if self._dst_host != "localhost":
 401            dstmonaddr = ("localhost", 9001)
 402        else:
 403            dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
 404        srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
 405
 406        src = QEMUMachine(self._binary,
 407                          args=self._get_src_args(hardware),
 408                          wrapper=self._get_src_wrapper(hardware),
 409                          name="qemu-src-%d" % os.getpid(),
 410                          monitor_address=srcmonaddr)
 411
 412        dst = QEMUMachine(self._binary,
 413                          args=self._get_dst_args(hardware, uri),
 414                          wrapper=self._get_dst_wrapper(hardware),
 415                          name="qemu-dst-%d" % os.getpid(),
 416                          monitor_address=dstmonaddr)
 417
 418        try:
 419            src.launch()
 420            dst.launch()
 421
 422            ret = self._migrate(hardware, scenario, src, dst, uri)
 423            progress_history = ret[0]
 424            qemu_timings = ret[1]
 425            vcpu_timings = ret[2]
 426            if uri[0:5] == "unix:" and os.path.exists(uri[5:]):
 427                os.remove(uri[5:])
 428
 429            if os.path.exists(srcmonaddr):
 430                os.remove(srcmonaddr)
 431
 432            if self._dst_host == "localhost" and os.path.exists(dstmonaddr):
 433                os.remove(dstmonaddr)
 434
 435            if self._verbose:
 436                print("Finished migration")
 437
 438            src.shutdown()
 439            dst.shutdown()
 440
 441            return Report(hardware, scenario, progress_history,
 442                          Timings(self._get_timings(src) + self._get_timings(dst)),
 443                          Timings(qemu_timings),
 444                          Timings(vcpu_timings),
 445                          self._binary, self._dst_host, self._kernel,
 446                          self._initrd, self._transport, self._sleep)
 447        except Exception as e:
 448            if self._debug:
 449                print("Failed: %s" % str(e))
 450            try:
 451                src.shutdown()
 452            except:
 453                pass
 454            try:
 455                dst.shutdown()
 456            except:
 457                pass
 458
 459            if self._debug:
 460                print(src.get_log())
 461                print(dst.get_log())
 462            raise
 463
 464