emulator.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. import pexpect
  2. import infra
  3. import infra.basetest
  4. # TODO: Most of the telnet stuff need to be replaced by stdio/pexpect to discuss
  5. # with the qemu machine.
  6. class Emulator(object):
  7. def __init__(self, builddir, downloaddir, logtofile):
  8. self.qemu = None
  9. self.__tn = None
  10. self.downloaddir = downloaddir
  11. self.log = ""
  12. self.logfile = infra.open_log_file(builddir, "run", logtofile)
  13. # Start Qemu to boot the system
  14. #
  15. # arch: Qemu architecture to use
  16. #
  17. # kernel: path to the kernel image, or the special string
  18. # 'builtin'. 'builtin' means a pre-built kernel image will be
  19. # downloaded from ARTEFACTS_URL and suitable options are
  20. # automatically passed to qemu and added to the kernel cmdline. So
  21. # far only armv5, armv7 and i386 builtin kernels are available.
  22. # If None, then no kernel is used, and we assume a bootable device
  23. # will be specified.
  24. #
  25. # kernel_cmdline: array of kernel arguments to pass to Qemu -append option
  26. #
  27. # options: array of command line options to pass to Qemu
  28. #
  29. def boot(self, arch, kernel=None, kernel_cmdline=None, options=None):
  30. if arch in ["armv7", "armv5"]:
  31. qemu_arch = "arm"
  32. else:
  33. qemu_arch = arch
  34. qemu_cmd = ["qemu-system-{}".format(qemu_arch),
  35. "-serial", "telnet::1234,server",
  36. "-display", "none"]
  37. if options:
  38. qemu_cmd += options
  39. if kernel_cmdline is None:
  40. kernel_cmdline = []
  41. if kernel:
  42. if kernel == "builtin":
  43. if arch in ["armv7", "armv5"]:
  44. kernel_cmdline.append("console=ttyAMA0")
  45. if arch == "armv7":
  46. kernel = infra.download(self.downloaddir,
  47. "kernel-vexpress")
  48. dtb = infra.download(self.downloaddir,
  49. "vexpress-v2p-ca9.dtb")
  50. qemu_cmd += ["-dtb", dtb]
  51. qemu_cmd += ["-M", "vexpress-a9"]
  52. elif arch == "armv5":
  53. kernel = infra.download(self.downloaddir,
  54. "kernel-versatile")
  55. qemu_cmd += ["-M", "versatilepb"]
  56. qemu_cmd += ["-kernel", kernel]
  57. if kernel_cmdline:
  58. qemu_cmd += ["-append", " ".join(kernel_cmdline)]
  59. self.logfile.write("> starting qemu with '%s'\n" % " ".join(qemu_cmd))
  60. self.qemu = pexpect.spawn(qemu_cmd[0], qemu_cmd[1:])
  61. # Wait for the telnet port to appear and connect to it.
  62. self.qemu.expect("waiting for connection")
  63. telnet_cmd = ["telnet", "localhost", "1234"]
  64. self.__tn = pexpect.spawn(telnet_cmd[0], telnet_cmd[1:])
  65. def __read_until(self, waitstr, timeout=5):
  66. index = self.__tn.expect([waitstr, pexpect.TIMEOUT], timeout=timeout)
  67. data = self.__tn.before
  68. if index == 0:
  69. data += self.__tn.after
  70. self.log += data
  71. self.logfile.write(data)
  72. return data
  73. def __write(self, wstr):
  74. self.__tn.send(wstr)
  75. # Wait for the login prompt to appear, and then login as root with
  76. # the provided password, or no password if not specified.
  77. def login(self, password=None):
  78. self.__read_until("buildroot login:", 10)
  79. if "buildroot login:" not in self.log:
  80. self.logfile.write("==> System does not boot")
  81. raise SystemError("System does not boot")
  82. self.__write("root\n")
  83. if password:
  84. self.__read_until("Password:")
  85. self.__write(password + "\n")
  86. self.__read_until("# ")
  87. if "# " not in self.log:
  88. raise SystemError("Cannot login")
  89. self.run("dmesg -n 1")
  90. # Run the given 'cmd' on the target
  91. # return a tuple (output, exit_code)
  92. def run(self, cmd):
  93. self.__write(cmd + "\n")
  94. output = self.__read_until("# ")
  95. output = output.strip().splitlines()
  96. output = output[1:len(output)-1]
  97. self.__write("echo $?\n")
  98. exit_code = self.__read_until("# ")
  99. exit_code = exit_code.strip().splitlines()[1]
  100. exit_code = int(exit_code)
  101. return output, exit_code
  102. def stop(self):
  103. if self.__tn:
  104. self.__tn.terminate(force=True)
  105. if self.qemu is None:
  106. return
  107. self.qemu.terminate(force=True)