sample_python_dbus_fast.py 1.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. import asyncio
  2. from dbus_fast.aio import MessageBus
  3. from dbus_fast.service import ServiceInterface, method
  4. import dbus_fast.introspection as intr
  5. from dbus_fast import BusType
  6. class SampleInterface(ServiceInterface):
  7. def __init__(self):
  8. super().__init__('test.interface')
  9. @method()
  10. def Ping(self):
  11. pass
  12. @method()
  13. def ConcatStrings(self, what1: 's', what2: 's') -> 's': # noqa: F821
  14. return what1 + what2
  15. async def main():
  16. bus_name = 'dbus.fast.sample'
  17. obj_path = '/test/path'
  18. bus = await MessageBus(bus_type=BusType.SYSTEM).connect()
  19. bus2 = await MessageBus(bus_type=BusType.SYSTEM).connect()
  20. await bus.request_name(bus_name)
  21. service_interface = SampleInterface()
  22. bus.export(obj_path, service_interface)
  23. introspection = await bus2.introspect(bus_name, obj_path)
  24. assert type(introspection) is intr.Node
  25. obj = bus2.get_proxy_object(bus_name, obj_path, introspection)
  26. interface = obj.get_interface(service_interface.name)
  27. result = await interface.call_ping()
  28. assert result is None
  29. result = await interface.call_concat_strings('hello ', 'world')
  30. assert result == 'hello world'
  31. asyncio.run(main())