123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- import pexpect
- import infra
- class Emulator(object):
- def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
- self.qemu = None
- self.downloaddir = downloaddir
- self.logfile = infra.open_log_file(builddir, "run", logtofile)
- # We use elastic runners on the cloud to runs our tests. Those runners
- # can take a long time to run the emulator. Use a timeout multiplier
- # when running the tests to avoid sporadic failures.
- self.timeout_multiplier = timeout_multiplier
- # Start Qemu to boot the system
- #
- # arch: Qemu architecture to use
- #
- # kernel: path to the kernel image, or the special string
- # 'builtin'. 'builtin' means a pre-built kernel image will be
- # downloaded from ARTEFACTS_URL and suitable options are
- # automatically passed to qemu and added to the kernel cmdline. So
- # far only armv5, armv7 and i386 builtin kernels are available.
- # If None, then no kernel is used, and we assume a bootable device
- # will be specified.
- #
- # kernel_cmdline: array of kernel arguments to pass to Qemu -append option
- #
- # options: array of command line options to pass to Qemu
- #
- def boot(self, arch, kernel=None, kernel_cmdline=None, options=None):
- if arch in ["armv7", "armv5"]:
- qemu_arch = "arm"
- else:
- qemu_arch = arch
- qemu_cmd = ["qemu-system-{}".format(qemu_arch),
- "-serial", "stdio",
- "-display", "none",
- "-m", "256"]
- if options:
- qemu_cmd += options
- if kernel_cmdline is None:
- kernel_cmdline = []
- if kernel:
- if kernel == "builtin":
- if arch in ["armv7", "armv5"]:
- kernel_cmdline.append("console=ttyAMA0")
- if arch == "armv7":
- kernel = infra.download(self.downloaddir,
- "kernel-vexpress")
- dtb = infra.download(self.downloaddir,
- "vexpress-v2p-ca9.dtb")
- qemu_cmd += ["-dtb", dtb]
- qemu_cmd += ["-M", "vexpress-a9"]
- elif arch == "armv5":
- kernel = infra.download(self.downloaddir,
- "kernel-versatile-4.19")
- dtb = infra.download(self.downloaddir,
- "versatile-pb-4.19.dtb")
- qemu_cmd += ["-dtb", dtb]
- qemu_cmd += ["-M", "versatilepb"]
- qemu_cmd += ["-device", "virtio-rng-pci"]
- qemu_cmd += ["-kernel", kernel]
- if kernel_cmdline:
- qemu_cmd += ["-append", " ".join(kernel_cmdline)]
- self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
- self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:],
- timeout=5 * self.timeout_multiplier,
- encoding='utf-8',
- env={"QEMU_AUDIO_DRV": "none"})
- # We want only stdout into the log to avoid double echo
- self.qemu.logfile_read = self.logfile
- # Wait for the login prompt to appear, and then login as root with
- # the provided password, or no password if not specified.
- def login(self, password=None):
- # The login prompt can take some time to appear when running multiple
- # instances in parallel, so set the timeout to a large value
- index = self.qemu.expect(["buildroot login:", pexpect.TIMEOUT],
- timeout=60 * self.timeout_multiplier)
- if index != 0:
- self.logfile.write("==> System does not boot")
- raise SystemError("System does not boot")
- self.qemu.sendline("root")
- if password:
- self.qemu.expect("Password:")
- self.qemu.sendline(password)
- index = self.qemu.expect(["# ", pexpect.TIMEOUT])
- if index != 0:
- raise SystemError("Cannot login")
- self.run("dmesg -n 1")
- # Run the given 'cmd' with a 'timeout' on the target
- # return a tuple (output, exit_code)
- def run(self, cmd, timeout=-1):
- self.qemu.sendline(cmd)
- if timeout != -1:
- timeout *= self.timeout_multiplier
- self.qemu.expect("# ", timeout=timeout)
- # Remove double carriage return from qemu stdout so str.splitlines()
- # works as expected.
- output = self.qemu.before.replace("\r\r", "\r").splitlines()[1:]
- self.qemu.sendline("echo $?")
- self.qemu.expect("# ")
- exit_code = self.qemu.before.splitlines()[2]
- exit_code = int(exit_code)
- return output, exit_code
- def stop(self):
- if self.qemu is None:
- return
- self.qemu.terminate(force=True)
|