Работать с cyphal на питоне можно с помощью библиотеки PyCyphal.
Пример класса Controller, в котором реализован паблишер и сабскрайбер. Паблишер будет отправлять напряжение (в вольтах), которое рассчитывается исходя из угла датчика мотора, получаемого сабскрайбером.
Программа, написанная на ардуино, которая публикует значение угла датчика мотора и подает в качестве управляющего воздействия напряжение, получаемое "сверху", находится в примерах на ардуино - ControlVoltage_SendAngle_cyphal
Само приложение Cyphal зависит от стандартных определений DSDL, расположенных в пространстве имен uavcan. Стандартное пространство имен является частью регламентированных пространств имен, поддерживаемых проектом OpenCyphal. Скачайте его с github:
import pycyphalimport pycyphal.applicationfrom pycyphal.application import heartbeat_publisherimport uavcan.si.unit.voltage import uavcan.si.sample.angleimport uavcan.nodeimport asyncioimport osimport sysdefsign(x):if x>0:return1elif x<0:return-1else:return0classController: REGISTER_FILE ="controller.db"def__init__(self) ->None: node_info = uavcan.node.GetInfo_1.Response( software_version=uavcan.node.Version_1(major=1, minor=0), name="org.opencyphal.pycyphal.cyphal_test", ) self.voltage_output =0 self._node = pycyphal.application.make_node(node_info, Controller.REGISTER_FILE) self._node.heartbeat_publisher.mode = uavcan.node.Mode_1.OPERATIONAL # type: ignore self._node.heartbeat_publisher.vendor_specific_status_code = os.getpid()%100 self._sub_ang = self._node.make_subscriber(uavcan.si.sample.angle.Scalar_1, 1111) self._pub_volt = self._node.make_publisher(uavcan.si.unit.voltage.Scalar_1, 1112) self._node.start()defclose(self) ->None:""" This will close all the underlying resources down to the transport interface and all publishers/servers/etc. All pending tasks such as serve_in_background()/receive_in_background() will notice this and exit automatically. """ self._node.close()asyncdefrun(self) ->None:""" The main method that runs the business logic. It is also possible to use the library in an IoC-style by using receive_in_background() for all subscriptions if desired. """ angle_setpoint =0.0asyncfor m, _metadata in self._sub_ang:assertisinstance(m, uavcan.si.sample.angle.Scalar_1)print("angle:",m.radian) err = angle_setpoint - m.radian self.voltage_output =-5*sign(err)await self._pub_volt.publish(uavcan.si.unit.voltage.Scalar_1(self.voltage_output))asyncdefmain() ->None:#logging.root.setLevel(logging.INFO) app =Controller()try:await app.run()exceptKeyboardInterrupt:passfinally: app.close()if__name__=="__main__": asyncio.run(main())