emulator.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # SPDX-License-Identifier: GPL-2.0
  2. # SPDX-License-Identifier: ISC
  3. import os
  4. import pexpect
  5. import pexpect.replwrap
  6. import time
  7. import infra
  8. BR_PROMPT = '[BRTEST# '
  9. BR_CONTINUATION_PROMPT = '[BRTEST+ '
  10. def _repl_sh_child(child, orig_prompt, extra_init_cmd):
  11. """Wrap the shell prompt to handle command output
  12. Based on pexpect.replwrap._repl_sh() (ISC licensed)
  13. https://github.com/pexpect/pexpect/blob/aa989594e1e413f45c18b26ded1783f7d5990fe5/pexpect/replwrap.py#L115
  14. """
  15. # If the user runs 'env', the value of PS1 will be in the output. To avoid
  16. # replwrap seeing that as the next prompt, we'll embed the marker characters
  17. # for invisible characters in the prompt; these show up when inspecting the
  18. # environment variable, but not when bash displays the prompt.
  19. non_printable_insert = '\\[\\]'
  20. ps1 = BR_PROMPT[:5] + non_printable_insert + BR_PROMPT[5:]
  21. ps2 = (BR_CONTINUATION_PROMPT[:5] + non_printable_insert +
  22. BR_CONTINUATION_PROMPT[5:])
  23. prompt_change = "PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
  24. # Note: this will run various commands, each with the default timeout defined
  25. # when qemu was spawned.
  26. return pexpect.replwrap.REPLWrapper(
  27. child,
  28. orig_prompt,
  29. prompt_change,
  30. new_prompt=BR_PROMPT,
  31. continuation_prompt=BR_CONTINUATION_PROMPT,
  32. extra_init_cmd=extra_init_cmd
  33. )
  34. class Emulator(object):
  35. def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
  36. self.qemu = None
  37. self.repl = None
  38. self.builddir = builddir
  39. self.downloaddir = downloaddir
  40. self.logfile = infra.open_log_file(builddir, "run", logtofile)
  41. # We use elastic runners on the cloud to runs our tests. Those runners
  42. # can take a long time to run the emulator. Use a timeout multiplier
  43. # when running the tests to avoid sporadic failures.
  44. self.timeout_multiplier = timeout_multiplier
  45. # Start Qemu to boot the system
  46. #
  47. # arch: Qemu architecture to use
  48. #
  49. # kernel: path to the kernel image, or the special string
  50. # 'builtin'. 'builtin' means a pre-built kernel image will be
  51. # downloaded from ARTIFACTS_URL and suitable options are
  52. # automatically passed to qemu and added to the kernel cmdline. So
  53. # far only armv5, armv7 and i386 builtin kernels are available.
  54. # If None, then no kernel is used, and we assume a bootable device
  55. # will be specified.
  56. #
  57. # kernel_cmdline: array of kernel arguments to pass to Qemu -append option
  58. #
  59. # options: array of command line options to pass to Qemu
  60. #
  61. def boot(self, arch, kernel=None, kernel_cmdline=None, options=None):
  62. if arch in ["armv7", "armv5"]:
  63. qemu_arch = "arm"
  64. else:
  65. qemu_arch = arch
  66. qemu_cmd = ["qemu-system-{}".format(qemu_arch),
  67. "-serial", "stdio",
  68. "-display", "none",
  69. "-m", "256"]
  70. if options:
  71. qemu_cmd += options
  72. if kernel_cmdline is None:
  73. kernel_cmdline = []
  74. if kernel:
  75. if kernel == "builtin":
  76. if arch in ["armv7", "armv5"]:
  77. kernel_cmdline.append("console=ttyAMA0")
  78. if arch == "armv7":
  79. kernel = infra.download(self.downloaddir,
  80. "kernel-vexpress-5.10.202")
  81. dtb = infra.download(self.downloaddir,
  82. "vexpress-v2p-ca9-5.10.202.dtb")
  83. qemu_cmd += ["-dtb", dtb]
  84. qemu_cmd += ["-M", "vexpress-a9"]
  85. elif arch == "armv5":
  86. kernel = infra.download(self.downloaddir,
  87. "kernel-versatile-5.10.202")
  88. dtb = infra.download(self.downloaddir,
  89. "versatile-pb-5.10.202.dtb")
  90. qemu_cmd += ["-dtb", dtb]
  91. qemu_cmd += ["-M", "versatilepb"]
  92. qemu_cmd += ["-device", "virtio-rng-pci"]
  93. qemu_cmd += ["-kernel", kernel]
  94. if kernel_cmdline:
  95. qemu_cmd += ["-append", " ".join(kernel_cmdline)]
  96. self.logfile.write(f"> host cpu count: {os.cpu_count()}\n")
  97. ldavg = os.getloadavg()
  98. ldavg_str = f"{ldavg[0]:.2f}, {ldavg[1]:.2f}, {ldavg[2]:.2f}"
  99. self.logfile.write(f"> host loadavg: {ldavg_str}\n")
  100. self.logfile.write(f"> timeout multiplier: {self.timeout_multiplier}\n")
  101. self.logfile.write(f"> emulator using {qemu_cmd[0]} version:\n")
  102. host_bin = os.path.join(self.builddir, "host", "bin")
  103. br_path = host_bin + os.pathsep + os.environ["PATH"]
  104. qemu_env = {"QEMU_AUDIO_DRV": "none",
  105. "PATH": br_path}
  106. pexpect.run(f"{qemu_cmd[0]} --version",
  107. encoding='utf-8',
  108. logfile=self.logfile,
  109. env=qemu_env)
  110. self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
  111. self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:],
  112. timeout=5 * self.timeout_multiplier,
  113. encoding='utf-8',
  114. codec_errors='replace',
  115. env=qemu_env)
  116. # We want only stdout into the log to avoid double echo
  117. self.qemu.logfile_read = self.logfile
  118. # Wait for the login prompt to appear, and then login as root with
  119. # the provided password, or no password if not specified.
  120. def login(self, password=None, timeout=60):
  121. # The login prompt can take some time to appear when running multiple
  122. # instances in parallel, so set the timeout to a large value
  123. index = self.qemu.expect(["buildroot login:", pexpect.TIMEOUT],
  124. timeout=timeout * self.timeout_multiplier)
  125. if index != 0:
  126. self.logfile.write("==> System does not boot")
  127. raise SystemError("System does not boot")
  128. self.qemu.sendline("root")
  129. if password:
  130. self.qemu.expect("Password:")
  131. self.qemu.sendline(password)
  132. self.connect_shell()
  133. output, exit_code = self.run(f"date -s @{int(time.time())}")
  134. if exit_code:
  135. raise SystemError("Cannot set date in virtual machine")
  136. def connect_shell(self):
  137. extra_init_cmd = " && ".join([
  138. 'export PAGER=cat',
  139. 'dmesg -n 1',
  140. # Prevent the shell from wrapping the commands at 80 columns.
  141. 'stty columns 29999',
  142. # Fix the prompt of any subshells that get run
  143. 'printf "%s\n" "PS1=\'$PS1\'" "PS2=\'$PS2\'" "PROMPT_COMMAND=\'\'" >>/etc/profile'
  144. ])
  145. self.repl = _repl_sh_child(self.qemu, '# ', extra_init_cmd)
  146. if not self.repl:
  147. raise SystemError("Cannot initialize REPL prompt")
  148. # Run the given 'cmd' with a 'timeout' on the target
  149. # return a tuple (output, exit_code)
  150. def run(self, cmd, timeout=-1):
  151. if timeout != -1:
  152. timeout *= self.timeout_multiplier
  153. output = self.repl.run_command(cmd, timeout=timeout)
  154. # Remove double carriage return from qemu stdout so str.splitlines()
  155. # works as expected.
  156. output = output.replace("\r\r", "\r").splitlines()[1:]
  157. exit_code = self.repl.run_command("echo $?")
  158. exit_code = self.qemu.before.splitlines()[2]
  159. exit_code = int(exit_code)
  160. return output, exit_code
  161. def stop(self):
  162. if self.qemu is None:
  163. return
  164. self.qemu.terminate(force=True)