test_mosquitto.py 1.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. import os
  2. import time
  3. import infra.basetest
  4. class TestMosquitto(infra.basetest.BRTest):
  5. config = infra.basetest.BASIC_TOOLCHAIN_CONFIG + \
  6. """
  7. BR2_PACKAGE_MOSQUITTO=y
  8. BR2_PACKAGE_MOSQUITTO_BROKER=y
  9. BR2_TARGET_ROOTFS_CPIO=y
  10. # BR2_TARGET_ROOTFS_TAR is not set
  11. """
  12. def test_run(self):
  13. cpio_file = os.path.join(self.builddir, "images", "rootfs.cpio")
  14. self.emulator.boot(arch="armv5",
  15. kernel="builtin",
  16. options=["-initrd", cpio_file])
  17. self.emulator.login()
  18. topic = "br-test-topic"
  19. log = "mqtt.log"
  20. msg = "Hello Buildroot!"
  21. # We subscribe to a topic and write one message to a log file.
  22. cmd = f"( mosquitto_sub -t {topic} -C 1 > {log} 2> /dev/null & )"
  23. self.assertRunOk(cmd)
  24. # We give some time to the subscriber process to settle.
  25. time.sleep(5)
  26. # We publish a message.
  27. self.assertRunOk(f"mosquitto_pub -t {topic} -m '{msg}'")
  28. # We give some more time to the subscriber process to write
  29. # the log and quit.
  30. time.sleep(5)
  31. # We check the log file contains our message.
  32. out, ret = self.emulator.run(f"cat {log}")
  33. self.assertEqual(ret, 0)
  34. self.assertEqual(out[0], msg)