-
ROS2(Cyclone DDS)๋ฅผ ๋คํธ์ํฌ๋ก ์ฐ๊ฒฐํ์ฌ ์คํํ๊ธฐROS2&MQTT 2025. 4. 4. 17:28
โ ๋ชฉํ: ROS2(Cyclone DDS)๋ฅผ ๋คํธ์ํฌ๋ก ์ฐ๊ฒฐํ์ฌ ์คํํ๊ธฐ
[ํผ๋ธ๋ฆฌ์ ๋ ธํธ๋ถ1] โโโโโโ [Cyclone DDS (P2P)] โโโโโโ [์๋ธ์คํฌ๋ผ์ด๋ฒ ๋ ธํธ๋ถ2]โROS2๋ MQTT์ ๋ค๋ฅด๊ฒ ๋ธ๋ก์ปค๊ฐ ์๊ณ , Peer-to-Peer(DDS) ๊ตฌ์กฐ์ด๋ค.
๊ทธ๋์ ๋คํธ์ํฌ ์ฐ๊ฒฐ ์กฐ๊ฑด์ด ์กฐ๊ธ ๋ ๊น๋ค๋กญ์ง๋ง, ์๋ง ์ค์ ํ๋ฉด MQTT๋ณด๋ค ๋ฎ์ ๋ ์ดํด์๋ฅผ ๋ณด์ฌ์ค๋ค
Ubuntu์์ ROS2 Humble ์ค์น (๊ณต์ ๋ฐฉ์, Ubuntu 22.04 ๊ธฐ์ค)
๐ฆ 1๋จ๊ณ: ์ค์น ์ ๊ธฐ๋ณธ ์ค์
sudo apt update && sudo apt install locales sudo locale-gen en_US en_US.UTF-8 sudo update-locale LC_ALL=en_US.UTF-8 LANG=en_US.UTF-8 export LANG=en_US.UTF-8
๐ 2๋จ๊ณ: ROS2 ํจํค์ง ์ ์ฅ์ ์ค์
sudo apt install software-properties-common sudo add-apt-repository universe sudo apt update && sudo apt install curl -y sudo curl -sSL https://raw.githubusercontent.com/ros/rosdistro/master/ros.asc | sudo tee /usr/share/keyrings/ros-archive-keyring.gpg > /dev/null
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/ros-archive-keyring.gpg] \ https://packages.ros.org/ros2/ubuntu $(. /etc/os-release && echo $UBUNTU_CODENAME) main" | \ sudo tee /etc/apt/sources.list.d/ros2.list > /dev/null
๐ฅ 3๋จ๊ณ: ROS2 Humble ์ค์น
sudo apt update sudo apt install ros-humble-desktop -y
๐ (๋์คํฌ ์ฌ์ ์์ ๊ฒฝ์ฐ ros-humble-ros-base๋ก ์ต์ ์ค์น๋ ๊ฐ๋ฅ)
4๋จ๊ณ: ROS2 ํ๊ฒฝ ๋ณ์ ์ค์
echo "source /opt/ros/humble/setup.bash" >> ~/.bashrc source ~/.bashrc
์ค์น ํ์ธ
ros2 --version # ์: ros2 0.9.8 (humble)
ros2 run demo_nodes_cpp talker # โ talker ๋ ธ๋๊ฐ ์คํ๋๋ฉด ์ฑ๊ณต!
colcon ๋ฐ ์์กด ํจํค์ง ์ค์น (๋น๋์ฉ)
sudo apt install python3-colcon-common-extensions -y
โ 1. Cyclone DDS ๋คํธ์ํฌ ํต์ ์กฐ๊ฑด
๊ธฐ๋ณธ ์ ์ :
- ROS2 ๋ ธ๋๋ Cyclone DDS๋ฅผ ํตํด ๋ฉํฐ์บ์คํธ UDP๋ฅผ ์ฌ์ฉํด์ ์๋ก ํ์
- ๋จ, ๋คํธ์ํฌ ์ค์ ์ด ๋ค๋ฅด๋ฉด ์๋ก ๋ชป ์ฐพ์ ์ ์๋ค
๐ ๊ผญ ์ง์ผ์ผ ํ ์กฐ๊ฑด:
์กฐ๊ฑด ์ค๋ช โ ๊ฐ์ ์๋ธ๋ท (์: 192.168.0.x) Multicast ํ์ ๊ฐ๋ฅํด์ผ ํจ โ ๋ฐฉํ๋ฒฝ ํ์ฉ UDP 7400 ~ 7500 ํฌํธ ๋ฒ์ ์ด๋ ค์ผ ํจ โ ๋ธ๋ฆฌ์ง ๋คํธ์ํฌ or ์ ์ ์ฐ๊ฒฐ Parallels, VirtualBox ์ฌ์ฉํ ๊ฒฝ์ฐ โ๋ธ๋ฆฌ์ง ๋ชจ๋โ ํ์ โ ๋์ผ ROS2 ๋ฒ์ & RMW ๋ ๋ค rmw_cyclonedds_cpp ์ฌ์ฉํ๋๋ก ์ค์
โ 2. Cyclone DDS ํต์ ํ์ธ ์คํ
A. ํผ๋ธ๋ฆฌ์ ๋ ธํธ๋ถ์์:
ros2 run demo_nodes_cpp talker
B. ์๋ธ์คํฌ๋ผ์ด๋ฒ ๋ ธํธ๋ถ์์:
ros2 run demo_nodes_cpp listener
์ ์ ์ฐ๊ฒฐ๋๋ฉด listener ์ชฝ์์ ๋ฉ์์ง ์ถ๋ ฅ๋จ
[publisher]
[subscriber]
โ 3. Cyclone DDS์์ ํต์ ์ด ์ ๋ ๊ฒฝ์ฐ ๋์ฒ๋ฒ
๐ ~/.ros/ros2/์ cyclonedds.xml ์ค์ ํ์ผ ๋ง๋ค๊ธฐ
<CycloneDDS> <Domain> <General> <NetworkInterfaceAddress>192.168.0.101</NetworkInterfaceAddress> <!-- ๋ณธ์ธ์ IP --> </General> <Discovery> <Peers> <Peer address="192.168.0.102"/> <!-- ์๋๋ฐฉ IP --> </Peers> </Discovery> </Domain> </CycloneDDS>
๊ทธ๋ฆฌ๊ณ ํ๊ฒฝ๋ณ์ ์ค์ :
export CYCLONEDDS_URI=file:///home/devjang/.ros/ros2/cyclonedds.xml
๋๋ .bashrc์ ์ถ๊ฐ!
โ 4. ROS2 ํผํฌ๋จผ์ค ์คํ์ ์ํ ์ฝ๋ ๊ตฌ์ฑ
์ญํ ์์ ํ์ผ์ค๋ช ํผ๋ธ๋ฆฌ์ dds_pub.py rclcpp::Publisher, dummy JSON ์ ์ก ์๋ธ์คํฌ๋ผ์ด๋ฒ dds_sub.py ์์ ํ latency ๊ณ์ฐ, ์์ค๋ฅ ์ธก์ ๊ฐ๋ฅ ๋น๋ ์์คํ colcon build + setup.py ์๊ฐํ visualize_result.ipynb๋ก ๋์ผํ๊ฒ ๋ถ์ ๊ฐ๋ฅ
setup.py์ package.xml์ ROS2์์ ํจํค์ง๋ฅผ ์ ์ํ๊ณ ๋น๋/๋ฐฐํฌํ ๋ ์ฌ์ฉํ๋ ํต์ฌ ๊ตฌ์ฑ ํ์ผ๋ค์ด๋ค, ๋ ๋ค ์ค์ํ ์ญํ ์ ํ์ง๋ง ์ฉ๋์ ํฌ๋งท์ด ๋ค๋ฅด๊ธฐ ๋๋ฌธ์ ํจ๊ป ์จ์ผํ๋ค.
[setup.py ์ฝ๋]
Python ๊ธฐ๋ฐ ROS2 ํจํค์ง์ ์ค์น ๋ฐ ์คํ ์คํฌ๋ฆฝํธ
๐น ์ญํ
- Python setuptools ๊ธฐ๋ฐ์ผ๋ก ROS2 ๋ ธ๋๋ฅผ ์ ์
- ์คํ ๊ฐ๋ฅํ ์คํฌ๋ฆฝํธ(ros2 run ...)๋ก ๋ฑ๋ก
- ์ค์นํ ํ์ผ, ์ด๋ฆ, ๋ฒ์ , ์คํ entrypoint ๋ฑ ์ง์
from setuptools import setup package_name = 'dds_test_py' setup( name=package_name, version='0.1.0', packages=[package_name], install_requires=['setuptools'], zip_safe=True, maintainer='Your Name', maintainer_email='you@example.com', description='Python ROS2 pub/sub test', license='Apache License 2.0', tests_require=['pytest'], entry_points={ 'console_scripts': [ 'dds_pub = dds_pub.dds_pub:main', 'dds_sub = dds_sub.dds_sub:main', ], }, )
๐บ ์ฃผ์: packages=[package_name] โ ์ด๊ฑด __init__.py ํฌํจ๋ ํด๋ ์ด๋ฆ๊ณผ ๊ฐ์์ผ ํ๋ค
[package.xml ์ฝ๋]
ROS2๊ฐ ํจํค์ง๋ฅผ ์ธ์ํ๊ณ , ์์กด์ฑ/๋ฉํ์ ๋ณด๋ฅผ ์ ์ํ๋ ํ์ผ
๐น ์ญํ
- ROS2์ ๋น๋ ์์คํ (colcon)์ด ์ด ํจํค์ง๋ฅผ ์ธ์ํ๊ฒ ํด์ค
- ์ด๋ค ํจํค์ง์ธ์ง ์ค๋ช (์ด๋ฆ, ๋ฒ์ , ์ ์ง๊ด๋ฆฌ์, ๋ผ์ด์ ์ค ๋ฑ)
- ์ด๋ค ROS2 ํจํค์ง๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์์กดํ๋์ง ์ ์ธ
<package format="3"> <name>dds_test_py</name> <version>0.1.0</version> <description>Python ROS2 pub/sub test</description> <maintainer email="you@example.com">Your Name</maintainer> <license>Apache 2.0</license> <!-- ํต์ฌ: Python ํจํค์ง๋ก ์ ์ธ --> <build_type>ament_python</build_type> <!-- ์คํ ์ ํ์ํ ์์กด์ฑ --> <exec_depend>rclpy</exec_depend> <exec_depend>std_msgs</exec_depend> </package>
โ ๋์ ๊ด๊ณ ์์ฝ
ํญ๋ชฉ package.xml setup.py ์ฌ์ฉ ๋์ ROS2 ์์คํ Python/colcon ์ค์น ๊ธฐ๋ฅ ํจํค์ง ์ ๋ณด, ROS ์์กด์ฑ Python ์ค์น/์คํ ์ค์ ํ์ ์ฌ๋ถ โ ROS2์์ ๋ฐ๋์ ํ์ โ Python ROS2 ๋ ธ๋์ ํ์ ๋น์ "์ฑ ํ์ง์ ๋ชฉ์ฐจ" "์ฑ ๋ด์ฉ๋ฌผ์ ์ค์น ์ค๋ช ์"
ํจ๊ป ์ฐ์ผ ๋์ ํ๋ฆ
- colcon build ๋ช
๋ น ์คํ ์
- package.xml ์ฝ๊ณ ์์กด์ฑ ํ์ธ
- Python ํจํค์ง์ธ ๊ฒฝ์ฐ setup.py๋ฅผ ์คํํด ๋น๋/์ค์น
- ๋น๋ ํ:
- ros2 run dds_test_py dds_pub โ setup.py์์ ๋ฑ๋กํ main() ํจ์ ์คํ
์คํ ์์
๋น๋:
colcon build --packages-select dds_test_py
์์ค:
source install/setup.bash
์คํ:
ros2 run dds_test_py dds_pub ros2 run dds_test_py dds_sub
โ ๋ชฉํ
๋ ธ๋ ๊ธฐ๋ฅ dds_pub.py id, timestamp, payload ํฌํจ ๋ฉ์์ง ์ ์ก + pub_log.csv ์ ์ฅ dds_sub.py ์์ ํ latency ๊ณ์ฐ + sub_log.csv, loss_result.csv ์ ์ฅ
๐ ํด๋ ๊ตฌ์กฐ (๊ธฐ๋ณธ)
ros2/ โโโ dds_pub/ โ โโโ __init__.py โ โโโ dds_pub.py โโโ dds_sub/ โ โโโ __init__.py โ โโโ dds_sub.py โโโ log/ โ โโโ dds_pub_log.csv โ โโโ dds_sub_log.csv โ โโโ loss_result.csv โโโ setup.py โโโ package.xml
[dds_pub.py (ํผ๋ธ๋ฆฌ์ ) ์ฝ๋]
import rclpy from rclpy.node import Node from std_msgs.msg import String import json, time, csv, os class DDSPublisher(Node): def __init__(self): super().__init__('dds_publisher') self.publisher_ = self.create_publisher(String, 'dds_topic', 10) self.timer_period = 0.1 # 10Hz self.timer = self.create_timer(self.timer_period, self.timer_callback) self.msg_id = 0 self.log_path = 'log/dds_pub_log.csv' os.makedirs('log', exist_ok=True) self.log_file = open(self.log_path, 'w', newline='') self.writer = csv.writer(self.log_file) self.writer.writerow(['id', 'timestamp', 'payload']) self.get_logger().info("DDS Publisher Started") def timer_callback(self): now = time.time() payload = { "id": self.msg_id, "timestamp": now, "data": {"sensor": "dds", "value": 123} } msg_str = json.dumps(payload) msg = String() msg.data = msg_str self.publisher_.publish(msg) self.writer.writerow([self.msg_id, now, msg_str]) self.msg_id += 1 def destroy_node(self): self.log_file.close() super().destroy_node() def main(args=None): rclpy.init(args=args) node = DDSPublisher() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info('Keyboard interrupt โ stopping.') finally: node.destroy_node() rclpy.shutdown()
[dds_sub.py (์๋ธ์คํฌ๋ผ์ด๋ฒ + latency + ์์ค๋ฅ )]
import rclpy from rclpy.node import Node from std_msgs.msg import String import json, time, csv, os class DDSSubscriber(Node): def __init__(self): super().__init__('dds_subscriber') self.subscription = self.create_subscription( String, 'dds_topic', self.listener_callback, 10) self.subscription # prevent unused variable warning self.received_ids = set() self.log_path = 'log/dds_sub_log.csv' self.loss_path = 'log/loss_result.csv' os.makedirs('log', exist_ok=True) self.log_file = open(self.log_path, 'w', newline='') self.writer = csv.writer(self.log_file) self.writer.writerow(['id', 'recv_time', 'latency_ms']) self.get_logger().info("DDS Subscriber Started") def listener_callback(self, msg): try: now = time.time() raw = json.loads(msg.data) msg_id = int(raw.get("id", -1)) sent_time = float(raw.get("timestamp", 0)) latency = (now - sent_time) * 1000 self.received_ids.add(msg_id) self.writer.writerow([msg_id, now, round(latency, 3)]) except Exception as e: self.get_logger().error(f"Error decoding message: {e}") def destroy_node(self): self.log_file.close() self.calculate_loss() super().destroy_node() def calculate_loss(self): try: with open('log/dds_pub_log.csv', newline='') as f: reader = csv.DictReader(f) pub_ids = set(int(row['id']) for row in reader) except FileNotFoundError: self.get_logger().warn("dds_pub_log.csv not found") return total_sent = len(pub_ids) total_recv = len(self.received_ids) loss = total_sent - total_recv loss_rate = (loss / total_sent) * 100 if total_sent > 0 else 0 with open(self.loss_path, 'w', newline='') as f: writer = csv.writer(f) writer.writerow(["total_sent", "total_received", "loss_count", "loss_rate(%)"]) writer.writerow([total_sent, total_recv, loss, round(loss_rate, 2)]) self.get_logger().info(f"์์ค๋ฅ : {loss_rate:.2f}% ({loss}/{total_sent})") def main(args=None): rclpy.init(args=args) node = DDSSubscriber() try: rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info('Keyboard interrupt โ stopping.') finally: node.destroy_node() rclpy.shutdown()
ํ์ฌ ๊ฒฝ๋ก์๋ ์์ง ์คํ ๊ฒฐ๊ณผ ๋ก๊ทธ ํ์ผ๋ค์ด ์กด์ฌํ์ง ์์์ ์๊ฐํ๋ฅผ ๋ฐ๋ก ์งํํ ์๋ ์์ผ๋ฉฐ ์คํ์ ๋๋ด๊ณ ๋ก๊ทธ ํ์ผ๋ค๋ง ์์ฑํ๋ฉด ๋ฐ๋ก ์ธ ์ ์๋๋ก, visualize_result.ipynb๋ฅผ ๊ตฌ์ฑํ์๋ค.
[visualize_result.ipynb ํ ํ๋ฆฟ]
import pandas as pd import matplotlib.pyplot as plt # ํ์ผ ๊ฒฝ๋ก pub_log_path = "log/dds_pub_log.csv" sub_log_path = "log/dds_sub_log.csv" loss_result_path = "log/loss_result.csv" # ํผ๋ธ๋ฆฌ์ ๋ก๊ทธ (์ ํ) try: pub_df = pd.read_csv(pub_log_path) print("์ด ํผ๋ธ๋ฆฌ์ ์:", len(pub_df)) except: print("โ ๏ธ ํผ๋ธ๋ฆฌ์ ๋ก๊ทธ ์์") # ์๋ธ์คํฌ๋ผ์ด๋ฒ ๋ก๊ทธ (latency) sub_df = pd.read_csv(sub_log_path) plt.figure(figsize=(10, 4)) plt.plot(sub_df["recv_time"], sub_df["latency_ms"], label="Latency (ms)") plt.xlabel("Receive Time (s)") plt.ylabel("Latency (ms)") plt.title("ROS2 Latency Over Time") plt.grid(True) plt.legend() plt.show() # ์์ค๋ฅ ์ถ๋ ฅ loss_df = pd.read_csv(loss_result_path) print("\n๐ Message Loss Summary:") display(loss_df)
์คํ ์์ & ๋ช ๋ น์ด
ํฐ๋ฏธ๋์์ ROS2 ํผ๋ธ๋ฆฌ์ ์คํ
ros2 run dds_test_py dds_pub
์คํ ์ข ๋ฃํ๋ ค๋ฉด: Ctrl + C
๋ค๋ฅธ ํฐ๋ฏธ๋์์ ROS2 ์๋ธ์คํฌ๋ผ์ด๋ฒ ์คํ
ros2 run dds_test_py dds_sub
์คํ ์ข ๋ฃํ๋ ค๋ฉด: Ctrl + C
์ข ๋ฃ ์ ์๋์ผ๋ก loss_result.csv ์์ฑ๋จ
๋ก๊ทธ ํ์ผ ํ์ธ
ls log/ # ์์: dds_pub_log.csv, dds_sub_log.csv, loss_result.csv
์๊ฐํ
jupyter notebook
โ visualize_result.ipynb ์ด๊ณ latency plot + ์์ค๋ฅ ํ ํ์ธ
'ROS2&MQTT' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
MQTT - 1, ๊ธฐ๋ณธํต์ (0) 2025.04.06 What is ROS2? (1) 2025.03.22