Kaynağa Gözat

support/testing/infra/emulator.py: fix qemu prompt detection

The qemu.run() method can break when a command happens to output the
string "# " to stdout. This is because qemu.run() detects when a command
has completed by searching for the shell prompt, which by default is
"# ". It then captures everything before the "# " as the commands
output, causing the rest of output to be lost.

Instead use the pexpect libraries REPLWrapper to handle running
commands. It has hooks to set a custom prompt and avoid some other
pitfalls of wrapping a shell.

We unfortunately can't reuse replwrap._repl_sh directly, because it
tries to spawn a command while we already have a qemu started. So we
need to copy that code into _repl_sh_child. While we're at it, also
define our own prompt strings.

Signed-off-by: Brandon Maier <brandon.maier@collins.com>
[Arnout:
 - Make all arguments to _repl_sh_spawn non-optional.
 - Move non_printable_insert to a local variable instead of an argument,
   we don't need to override it.
 - Copy the comment from _repl_sh that explains why non_printable_insert
   is needed.
 - Add a comment about timeouts.
 - Rename spawn to child (we don't actually spawn anything so this felt
   more natural, even though the class
 - Use single quotes instead of triple quotes, and explicitly escape the
   nested quotes.
]
Signed-off-by: Arnout Vandecappelle <arnout@mind.be>
Brandon Maier 1 yıl önce
ebeveyn
işleme
0cad947b96
1 değiştirilmiş dosya ile 47 ekleme ve 11 silme
  1. 47 11
      support/testing/infra/emulator.py

+ 47 - 11
support/testing/infra/emulator.py

@@ -1,12 +1,44 @@
 import pexpect
 import pexpect
+import pexpect.replwrap
 
 
 import infra
 import infra
 
 
+BR_PROMPT = '[BRTEST# '
+BR_CONTINUATION_PROMPT = '[BRTEST+ '
+
+
+def _repl_sh_child(child, orig_prompt, extra_init_cmd):
+    """Wrap the shell prompt to handle command output
+    Based on pexpect.replwrap._repl_sh() (ISC licensed)
+    https://github.com/pexpect/pexpect/blob/aa989594e1e413f45c18b26ded1783f7d5990fe5/pexpect/replwrap.py#L115
+    """
+
+    # If the user runs 'env', the value of PS1 will be in the output. To avoid
+    # replwrap seeing that as the next prompt, we'll embed the marker characters
+    # for invisible characters in the prompt; these show up when inspecting the
+    # environment variable, but not when bash displays the prompt.
+    non_printable_insert = '\\[\\]'
+    ps1 = BR_PROMPT[:5] + non_printable_insert + BR_PROMPT[5:]
+    ps2 = (BR_CONTINUATION_PROMPT[:5] + non_printable_insert +
+           BR_CONTINUATION_PROMPT[5:])
+    prompt_change = "PS1='{0}' PS2='{1}' PROMPT_COMMAND=''".format(ps1, ps2)
+    # Note: this will run various commands, each with the default timeout defined
+    # when qemu was spawned.
+    return pexpect.replwrap.REPLWrapper(
+            child,
+            orig_prompt,
+            prompt_change,
+            new_prompt=BR_PROMPT,
+            continuation_prompt=BR_CONTINUATION_PROMPT,
+            extra_init_cmd=extra_init_cmd
+        )
+
 
 
 class Emulator(object):
 class Emulator(object):
 
 
     def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
     def __init__(self, builddir, downloaddir, logtofile, timeout_multiplier):
         self.qemu = None
         self.qemu = None
+        self.repl = None
         self.downloaddir = downloaddir
         self.downloaddir = downloaddir
         self.logfile = infra.open_log_file(builddir, "run", logtofile)
         self.logfile = infra.open_log_file(builddir, "run", logtofile)
         # We use elastic runners on the cloud to runs our tests. Those runners
         # We use elastic runners on the cloud to runs our tests. Those runners
@@ -97,26 +129,30 @@ class Emulator(object):
         if password:
         if password:
             self.qemu.expect("Password:")
             self.qemu.expect("Password:")
             self.qemu.sendline(password)
             self.qemu.sendline(password)
-        index = self.qemu.expect(["# ", pexpect.TIMEOUT])
-        if index != 0:
-            raise SystemError("Cannot login")
-        self.run("dmesg -n 1")
-        # Prevent the shell from wrapping the commands at 80 columns.
-        self.run("stty columns 29999")
+
+        extra_init_cmd = " && ".join([
+            'export PAGER=cat',
+            'dmesg -n 1',
+            # Prevent the shell from wrapping the commands at 80 columns.
+            'stty columns 29999',
+            # Fix the prompt of any subshells that get run
+            'printf "%s\n"  "PS1=\'$PS1\'" "PS2=\'$PS2\'" "PROMPT_COMMAND=\'\'" >>/etc/profile'
+        ])
+        self.repl = _repl_sh_child(self.qemu, '# ', extra_init_cmd)
+        if not self.repl:
+            raise SystemError("Cannot initialize REPL prompt")
 
 
     # Run the given 'cmd' with a 'timeout' on the target
     # Run the given 'cmd' with a 'timeout' on the target
     # return a tuple (output, exit_code)
     # return a tuple (output, exit_code)
     def run(self, cmd, timeout=-1):
     def run(self, cmd, timeout=-1):
-        self.qemu.sendline(cmd)
         if timeout != -1:
         if timeout != -1:
             timeout *= self.timeout_multiplier
             timeout *= self.timeout_multiplier
-        self.qemu.expect("# ", timeout=timeout)
+        output = self.repl.run_command(cmd, timeout=timeout)
         # Remove double carriage return from qemu stdout so str.splitlines()
         # Remove double carriage return from qemu stdout so str.splitlines()
         # works as expected.
         # works as expected.
-        output = self.qemu.before.replace("\r\r", "\r").splitlines()[1:]
+        output = output.replace("\r\r", "\r").splitlines()[1:]
 
 
-        self.qemu.sendline("echo $?")
-        self.qemu.expect("# ")
+        exit_code = self.repl.run_command("echo $?")
         exit_code = self.qemu.before.splitlines()[2]
         exit_code = self.qemu.before.splitlines()[2]
         exit_code = int(exit_code)
         exit_code = int(exit_code)