test_nftables.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import os
  2. import infra.basetest
  3. class TestNftables(infra.basetest.BRTest):
  4. config = \
  5. """
  6. BR2_aarch64=y
  7. BR2_TOOLCHAIN_EXTERNAL=y
  8. BR2_TARGET_GENERIC_GETTY_PORT="ttyAMA0"
  9. BR2_LINUX_KERNEL=y
  10. BR2_LINUX_KERNEL_CUSTOM_VERSION=y
  11. BR2_LINUX_KERNEL_CUSTOM_VERSION_VALUE="6.1.46"
  12. BR2_LINUX_KERNEL_USE_CUSTOM_CONFIG=y
  13. BR2_LINUX_KERNEL_CUSTOM_CONFIG_FILE="board/qemu/aarch64-virt/linux.config"
  14. BR2_LINUX_KERNEL_NEEDS_HOST_OPENSSL=y
  15. BR2_PACKAGE_NFTABLES=y
  16. BR2_PACKAGE_PYTHON3=y
  17. BR2_ROOTFS_OVERLAY="{}"
  18. BR2_TARGET_ROOTFS_CPIO=y
  19. BR2_TARGET_ROOTFS_CPIO_GZIP=y
  20. # BR2_TARGET_ROOTFS_TAR is not set
  21. """.format(
  22. infra.filepath("tests/package/test_nftables/rootfs-overlay"))
  23. def nftables_test(self, prog="nft"):
  24. # Table/Chain names for the test
  25. nft_table = "br_ip_table"
  26. nft_chain = "br_ip_chain_in"
  27. # We flush all nftables rules, to start from a known state.
  28. self.assertRunOk(f"{prog} flush ruleset")
  29. # We create an ip table.
  30. self.assertRunOk(f"{prog} add table ip {nft_table}")
  31. # We should be able to list this table.
  32. list_cmd = f"{prog} list tables ip"
  33. output, exit_code = self.emulator.run(list_cmd)
  34. self.assertEqual(exit_code, 0)
  35. self.assertIn(nft_table, output[0])
  36. # We create an ip input chain in our table.
  37. cmd = f"{prog} add chain ip"
  38. cmd += f" {nft_table} {nft_chain}"
  39. cmd += " { type filter hook input priority 0 \\; }"
  40. self.assertRunOk(cmd)
  41. # We list our chain.
  42. cmd = f"{prog} list chain ip {nft_table} {nft_chain}"
  43. self.assertRunOk(cmd)
  44. # We add a filter rule to drop pings (icmp echo-requests) to
  45. # the 127.0.0.2 destination.
  46. cmd = f"{prog} add rule ip {nft_table} {nft_chain}"
  47. cmd += " ip daddr 127.0.0.2 icmp type echo-request drop"
  48. self.assertRunOk(cmd)
  49. # We list our rule.
  50. self.assertRunOk(f"{prog} list ruleset ip")
  51. # A ping to 127.0.0.1 is expected to work, because it's not
  52. # matching our rule. We expect 3 replies (-c), with 0.5s
  53. # internal (-i), and set a maximum timeout of 2s.
  54. ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 "
  55. self.assertRunOk(ping_cmd_prefix + "127.0.0.1")
  56. # A ping to 127.0.0.2 is expected to fail, because our rule is
  57. # supposed to drop it.
  58. ping_test_cmd = ping_cmd_prefix + "127.0.0.2"
  59. _, exit_code = self.emulator.run(ping_test_cmd)
  60. self.assertNotEqual(exit_code, 0)
  61. # We completely delete the table. This should also delete the
  62. # chain and the rule.
  63. self.assertRunOk(f"{prog} delete table ip {nft_table}")
  64. # We should no longer see the table in the list.
  65. output, exit_code = self.emulator.run(list_cmd)
  66. self.assertEqual(exit_code, 0)
  67. self.assertNotIn(nft_table, "\n".join(output))
  68. # Since we deleted the rule, the ping test command which was
  69. # supposed to fail earlier is now supposed to succeed.
  70. self.assertRunOk(ping_test_cmd)
  71. def boot_vm(self):
  72. img = os.path.join(self.builddir, "images", "rootfs.cpio.gz")
  73. kern = os.path.join(self.builddir, "images", "Image")
  74. self.emulator.boot(arch="aarch64",
  75. kernel=kern,
  76. kernel_cmdline=["console=ttyAMA0"],
  77. options=["-M", "virt",
  78. "-cpu", "cortex-a57",
  79. "-m", "256M",
  80. "-initrd", img])
  81. self.emulator.login()
  82. def test_run(self):
  83. self.boot_vm()
  84. # We check the program can execute.
  85. self.assertRunOk("nft --version")
  86. # We run the nftables test sequence using the default "nft"
  87. # user space configuration tool.
  88. self.nftables_test()
  89. # We run again the same test sequence using our simple nft
  90. # python implementation, to check the language bindings.
  91. self.nftables_test(prog="/root/nft.py")
  92. class TestNftablesInit(TestNftables):
  93. config = TestNftables.config + \
  94. """
  95. BR2_INIT_BUSYBOX=y
  96. """
  97. def test_run(self):
  98. self.boot_vm()
  99. # start with known state (rules from /etc/nftables.conf)
  100. self.assertRunOk("/etc/init.d/S35nftables reload")
  101. # Same concept as in TestNftables.nftables_test: The rules
  102. # should allow ping to 127.0.0.1, but not 127.0.0.2.
  103. ping_cmd_prefix = "ping -c 3 -i 0.5 -W 2 "
  104. self.assertRunOk(ping_cmd_prefix + "127.0.0.1")
  105. _, exit_code = self.emulator.run(ping_cmd_prefix + "127.0.0.2")
  106. self.assertNotEqual(exit_code, 0)
  107. # Stop should flush the rules, ping to both addresses should
  108. # work now.
  109. self.assertRunOk("/etc/init.d/S35nftables stop")
  110. self.assertRunOk(ping_cmd_prefix + "127.0.0.1")
  111. self.assertRunOk(ping_cmd_prefix + "127.0.0.2")
  112. # Start is essentially the same as reload, check that
  113. # 127.0.0.2 gets blocked again.
  114. self.assertRunOk("/etc/init.d/S35nftables start")
  115. _, exit_code = self.emulator.run(ping_cmd_prefix + "127.0.0.2")
  116. self.assertNotEqual(exit_code, 0)