MQTT - 1, ๊ธฐ๋ณธํต์
๐ท 1. MQTT ์คํ ํ๊ฒฝ ์ค์
๐ [1-1] Mosquitto ๋ธ๋ก์ปค ์ค์ (mosquitto.conf)
- Mosquitto๋ ์คํ์์ค MQTT ๋ธ๋ก์ปค
- ์ค์ ํ์ผ: config/mosquitto.conf
listener 1883 # ํฌํธ ์ค์
allow_anonymous true # ์ธ์ฆ ์์ด ์ฌ์ฉ
persistence true # ์ฌ์์ ํ์๋ ๋ฉ์์ง ์ ์ฅ
persistence_location /mosquitto/data/
log_dest stdout # ๋ก๊ทธ ์ถ๋ ฅ
๐ [1-2] Docker๋ก ๋ธ๋ก์ปค ์คํ (์ต์ )
docker run -it -p 1883:1883 -v ./config:/mosquitto/config -v ./data:/mosquitto/data eclipse-mosquitto
๐ท 1. mqttPub_from_csv.py ์ฝ๋ ์ค๋ช (Publisher)
๐ ์ ์ฒด ๋ชฉ์
CSV ํ์ผ์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด, ์ผ์ ๊ฐ๊ฒฉ์ผ๋ก MQTT ๋ธ๋ก์ปค๋ฅผ ํตํด ๋ฉ์์ง๋ฅผ ์ ์กํ๊ณ , ๊ทธ ๋ด์ฉ์ ๋ก์ปฌ ๋ก๊ทธ ํ์ผ์๋ ์ ์ฅํจ.
๐ ์ฝ๋ ๋จ๊ณ๋ณ ์ค๋ช
### 1. ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ฐ ๊ฒฝ๋ก ์ค์
import time, json, csv
import paho.mqtt.client as mqtt
import os
- paho.mqtt.client: MQTT ํต์ ์ ์ํ ํ์ด์ฌ ํด๋ผ์ด์ธํธ
- json, csv: ๋ฉ์์ง ํฌ๋งท๊ณผ ๋ฐ์ดํฐ ๋ก๋ฉ
- os: ๊ฒฝ๋ก ํ์ธ ๋ฐ ํด๋ ์์ฑ
### 2. ๊ฒฝ๋ก ๋ฐ ์ค์ ๋ณ์
CSV_PATH = "/app/industrial_robot_control_6G_network.csv"
LOG_PATH = "/app/pub_log.csv"
MQTT_BROKER = "mqtt-broker"
MQTT_PORT = 1883
MQTT_TOPIC = "test/latency"
PUBLISH_INTERVAL = 0.1โ
- CSV_PATH: ๋ฐํํ ๋ฐ์ดํฐ๊ฐ ๋ด๊ธด CSV ํ์ผ
- LOG_PATH: ๋ฐํ ๋ก๊ทธ๋ฅผ ์ ์ฅํ ๊ฒฝ๋ก
- MQTT_BROKER: MQTT ๋ธ๋ก์ปค ์ฃผ์ (mqtt-broker๋ Docker ๋คํธ์ํฌ alias๋ก ์ฌ์ฉ๋ ์๋ ์์. ์ค์ ํ๊ฒฝ์์๋ 192.168.x.x ๋ฑ์ผ๋ก ๋ณ๊ฒฝ ๊ฐ๋ฅ)
- PUBLISH_INTERVAL: ๋ฉ์์ง ๋ฐํ ์ฃผ๊ธฐ (์ด ๋จ์, ์: 0.1์ด → 10Hz)
### 3. MQTT ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ๋ฐ ์ฐ๊ฒฐ
client = mqtt.Client()
client.connect(MQTT_BROKER, MQTT_PORT)
client.loop_start()
msg_id = 0
- ํด๋ผ์ด์ธํธ๋ฅผ ์์ฑํ๊ณ ๋ธ๋ก์ปค์ ์ฐ๊ฒฐํจ.
- loop_start()๋ ๋น๋๊ธฐ๋ก ๋ฉ์์ง ์ก์์ ์ฒ๋ฆฌ๋ฅผ ์์ํจ.
- msg_id๋ ๋ฉ์์ง ๋ฒํธ๋ฅผ ๊ด๋ฆฌํ๋ ์นด์ดํฐ (์์ค๋ฅ ๋ถ์์ ์ฐ์)
### 4. MQTT ํด๋ผ์ด์ธํธ ์ด๊ธฐํ ๋ฐ ์ฐ๊ฒฐ
os.makedirs(os.path.dirname(LOG_PATH), exist_ok=True)
- ๋ก๊ทธ ๋๋ ํ ๋ฆฌ๊ฐ ์์ผ๋ฉด ์์ฑ (/app ๋ด๋ถ๋ Docker ์ปจํ ์ด๋ ๊ธฐ์ค)
### 5. ๋ก๊ทธ ํ์ผ ์ด๊ธฐ & ํค๋ ์ฐ๊ธฐ
with open(LOG_PATH, "w", newline='') as logfile:
log_writer = csv.writer(logfile)
log_writer.writerow(["id", "timestamp", "payload"])
- pub_log.csv ์์ฑ ๋ฐ ๋ก๊ทธ์ฉ CSV writer ์ค๋น
### 6. CSV ํ์ผ์ ํ ์ค์ฉ ์ฝ์ด์ MQTT๋ก ์ ์ก
with open(CSV_PATH, newline='') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
now = time.time()
msg = {
"id": msg_id,
"timestamp": now,
"data": row,
}
payload = json.dumps(msg)
client.publish(MQTT_TOPIC, payload)
...
log_writer.writerow([msg_id, now, payload])
msg_id += 1
time.sleep(PUBLISH_INTERVAL)
- CSV์์ ํ ์ค์ฉ ์ฝ๊ณ
- ๊ฐ row๋ฅผ ํฌํจํ JSON ๋ฉ์์ง๋ฅผ ์์ฑ
- ๋ฉ์์ง ID์ timestamp๋ฅผ ํฌํจํ์ฌ test/latency ํ ํฝ์ผ๋ก MQTT ๋ฐํ
- ๋ฐํ ํ ํด๋น ๋ฉ์์ง๋ฅผ ๋ก๊ทธ๋ก ๊ธฐ๋ก
- ์ง์ ๋ ์ฃผ๊ธฐ(0.1์ด)๋ก ์ ์ก ๋ฐ๋ณต
### 7. ์คํ ์ข ๋ฃ ๋ก๊ทธ
print("[mqttPub_from_csv] Finished publishing.")
- ์ ์ฒด ๋ฐํ์ด ๋๋ฌ์ ๋ ์ถ๋ ฅ
๐ ๋คํธ์ํฌ ๊ธฐ๋ฐ ์คํ์์์ ์คํ ๋ฐฉ๋ฒ
ํผ๋ธ๋ฆฌ์ ์ ๋ธ๋ก์ปค๋ ๊ฐ์ ๋คํธ์ํฌ ๋ด์์ ํต์ ๊ฐ๋ฅํด์ผ ํจ.
๐ฅ๏ธ ๋ก์ปฌ ๋จธ์ 1 (๋ธ๋ก์ปค ์คํ):
mosquitto -c config/mosquitto.conf
๋๋ Docker ์ฌ์ฉ ์:
docker run -it -p 1883:1883 -v $PWD/config:/mosquitto/config eclipse-mosquitto
๐ฅ๏ธ ๋จธ์ 2 (ํผ๋ธ๋ฆฌ์ ์คํ):
๋ธ๋ก์ปค๊ฐ ์คํ ์ค์ธ ์ปดํจํฐ์ IP๊ฐ ์๋ฅผ ๋ค์ด 192.168.0.42๋ผ๋ฉด:
MQTT_BROKER = "192.168.0.42"
์ผ๋ก ์์ ํ ์คํ:
python3 mqttPub_from_csv.py
๐ ๋ฐํ ์ ๊ฐ์กฐํ ํคํฌ์ธํธ
ํฌ์ธํธ | ์ค๋ช |
๋คํธ์ํฌ ๊ธฐ๋ฐ ํต์ | ๋ธ๋ก์ปค์ ํผ๋ธ๋ฆฌ์ ๋ ์๋ก ๋ค๋ฅธ ์ฅ๋น์์ ๋์ ๊ฐ๋ฅ |
๋ฉ์์ง ๊ตฌ์กฐ | id, timestamp, data๋ก ๊ตฌ์ฑ → ์ ๋ ๋ถ์ ๊ฐ๋ฅ |
์ ์ก ๊ฐ๊ฒฉ | ์คํ ๋ชฉ์ ์ ๋ฐ๋ผ ์์ ๋กญ๊ฒ ์ค์ ๊ฐ๋ฅ |
๋ก๊ทธ ๊ธฐ๋ก | ์ค์๊ฐ ์คํ ํ latency, ์์ค๋ฅ ๋ถ์์ ํ์ฉ |
์ ์ฐํ ๋ฐ์ดํฐ ํฌ๊ธฐ ์กฐ์ | payload ํฌ๊ธฐ ์คํ์ ์ด์ ๋ฒ์ ์ฝ๋์ ํฌํจ ๊ฐ๋ฅ (generate_fixed_size_payload() ํ์ฉ) |
๐ท2. mqttSub.py ์ฝ๋ ์ค๋ช (Subscriber)
๐ ์ ์ฒด ๋ชฉ์
- MQTT ๋ธ๋ก์ปค์์ ๋ฉ์์ง๋ฅผ ์์
- ๋ฉ์์ง๋ง๋ค latency ์ธก์
- ๋ก๊ทธ ๊ธฐ๋ก (sub_log.csv)
- ์คํ ์ข ๋ฃ ์ ์์ค๋ฅ ๊ณ์ฐ (loss_result.csv)
๐ ์ฝ๋ ๋จ๊ณ๋ณ ์ค๋ช
### 1. ๋ผ์ด๋ธ๋ฌ๋ฆฌ ๋ฐ ์ค์ ๊ฒฝ๋ก
import time, json, csv
import paho.mqtt.client as mqtt
import os
### 2. ํ์ผ ๊ฒฝ๋ก ๋ฐ ๋ณ์ ์ด๊ธฐํ
PUB_LOG_PATH = "/app/pub_log.csv"
SUB_LOG_PATH = "/app/sub_log.csv"
LOSS_LOG_PATH = "/app/loss_result.csv"
- ํผ๋ธ๋ฆฌ์ ๊ฐ ๊ธฐ๋กํ ๋ฉ์์ง ๋ก๊ทธ๋ฅผ ์ฝ์ด ์์ค๋ฅ ๋ถ์์ ํ์ฉ
count = 0
total_latency = 0
received = 0
received_ids = set()
- ์์ ํ ๋ฉ์์ง ๊ฐ์, ID ๋ชฉ๋ก ๋ฑ์ ์ถ์
### 3. ์์ ๋ก๊ทธ ํ์ผ ์์ฑ ๋ฐ ํค๋ ์์ฑ
os.makedirs("/app", exist_ok=True)
csv_file = open(SUB_LOG_PATH, "w", newline="")
writer = csv.writer(csv_file)
writer.writerow(["id", "timestamp", "latency_ms"])
- ์์ ๋ก๊ทธ ํ์ผ์ /app/sub_log.csv๋ก ์์ฑํ๊ณ ํค๋๋ฅผ ๊ธฐ๋ก
### 4. MQTT ์์ ์ฒ๋ฆฌ ํจ์ ์ ์
def on_message(client, userdata, msg):
- ์์ ์ ํธ์ถ๋จ
- payload ๋์ฝ๋ฉ → latency ๊ณ์ฐ
latency = (time.time() - raw["timestamp"]) * 1000
- ํผ๋ธ๋ฆฌ์ ๊ฐ ๊ธฐ๋กํ timestamp์ ํ์ฌ ์๊ฐ์ ์ฐจ์ด๋ก latency(ms) ๊ณ์ฐ
writer.writerow([msg_id, time.time(), round(latency, 3)])
- sub_log.csv์ ์ ์ฅ
received_ids.add(int(msg_id))
- ๋ฉ์์ง ID ์ ์ฅ → ์์ค๋ฅ ๊ณ์ฐ์ ํ์ฉ๋จ
### 5. ์์ค๋ฅ ๊ณ์ฐ ํจ์
def calculate_loss():
- ์์ ์ด ๋๋ ๋ค ํผ๋ธ๋ฆฌ์ ๋ก๊ทธ์ ์์ ๋ก๊ทธ๋ฅผ ๋น๊ตํ์ฌ
- ์ ์ก ๊ฐ์, ์์ ๊ฐ์, ์์ค ๊ฐ์, ์์ค๋ฅ ์ ๊ณ์ฐ
loss_rate = (loss_count / total_sent) * 100
### 6. ๊ตฌ๋ ์ค์ ๋ฐ ์คํ
client = mqtt.Client()
client.connect("mqtt-broker", 1883)
client.subscribe("test/latency")
client.on_message = on_message
- ํผ๋ธ๋ฆฌ์ ์ ๋์ผํ ํ ํฝ test/latency์ ๊ตฌ๋
client.loop_forever()
- ๋ฌดํ ๋ฃจํ ํํ๋ก ์คํ → ์คํ ์ข ๋ฃ ์ Ctrl+C
### 7. ์ข ๋ฃ ํ ๋ก๊ทธ ๋ฐ ์์ค๋ฅ ์ถ๋ ฅ
finally:
csv_file.close()
calculate_loss()
๐ ๋คํธ์ํฌ ๊ธฐ๋ฐ ์คํ๋ฒ (Subscriber)
๐ฅ๏ธ ๋ค๋ฅธ ์ฅ๋น์์ ์คํํ ๊ฒฝ์ฐ:
- mqttSub.py์์ ๋ธ๋ก์ปค IP ์์ :
client.connect("192.168.0.42", 1883) # ๋ธ๋ก์ปค ์คํ ์ค์ธ ์ฅ๋น์ IP
2. ์คํ:
python3 mqttSub.py
๐ ์ฃผ์: ํผ๋ธ๋ฆฌ์ ๊ฐ ๋ณด๋ด๊ธฐ ์์ํ ํ ์คํํด๋ ๋์ง๋ง, ๋์ ์คํ์ด ์ด์์
๋ก๊ทธ ์ ์ฅ์ /app/sub_log.csv, ์์ค๋ฅ ์ /app/loss_result.csv์ ์ ์ฅ๋จ
โ ์คํ ๊ฒฐ๊ณผ ๋ถ์ ํ๋ฆ ์ ๋ฆฌ
ํ์ผ | ๋ด์ฉ |
pub_log.csv | ๋ฐํ๋ ๋ฉ์์ง ๋ก๊ทธ (id, timestamp, payload) |
sub_log.csv | ์์ ๋ ๋ฉ์์ง ๋ก๊ทธ (id, timestamp, latency_ms) |
loss_result.csv | ์ด ๋ฐ์ก, ์์ , ์์ค๋ฅ ์ ๋ณด |
pub_cpu.csv, sub_cpu.csv | ๊ฐ๊ฐ์ CPU ์ฌ์ฉ๋ฅ (์คํ ๋ถํ ์ธก์ ๋ชฉ์ ) |
๐ฌ ๋ถ์ ์์ (calculate_loss.py ๊ฒฐ๊ณผ ์์)
์ ์ก ์: 1000, ์์ ์: 995, ์์ค ์: 5, ์์ค๋ฅ : 0.50%
loss_result.csv ์์:
total_sent | total_received | loss_count | loss_rate(%) |
1000 | 995 | 5 | 0.5 |
๐ฏ ๋ฐํ ์ ๊ฐ์กฐ ํฌ์ธํธ ์ ๋ฆฌ
๊ตฌ๊ฐ | ๊ฐ์กฐ ๋ด์ฉ |
Publisher | payload ํฌ๊ธฐ, ์ ์ก ์ฃผ๊ธฐ, ๋ก๊ทธ ๊ธฐ๋ฐ ์คํ ๋ถ์ ๊ฐ๋ฅ |
Subscriber | latency, ์ค์๊ฐ ์์ , ์์ค๋ฅ ์๋ ๊ณ์ฐ |
๋ถ์ | CSV ๊ธฐ๋ฐ์ผ๋ก ๊ฐ๊ด์ ์ธ ์งํ ๋์ถ ๊ฐ๋ฅ |
ํ์ฅ์ฑ | ๋คํธ์ํฌ ๊ธฐ๋ฐ ์คํ, ๋ค์ํ ์ฅ๋น/ํ๊ฒฝ์์ ์คํ ๊ฐ๋ฅ |
pub_cpu_logger.py / sub_cpu_logger.py ์ค๋ช (CPU ๋ก๊น ์ฉ)
๐ ๊ณตํต ๋ชฉ์
- psutil ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ฌ์ฉํด CPU ์ฌ์ฉ๋ฅ ์ 1์ด ๋จ์๋ก ์ธก์
- CSV๋ก ์ ์ฅํ์ฌ ์๊ฐํ/๋น๊ต ๋ถ์์ ํ์ฉ
1๏ธโฃ pub_cpu_logger.py (ํผ๋ธ๋ฆฌ์ ์ธก CPU ๋ก๊ฑฐ)
import time
import csv
import psutil
import os
print("[pub-cpu] Starting CPU logging...")
log_path = "/app/pub_cpu.csv"
with open(log_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "cpu_percent"])
try:
while True:
cpu = psutil.cpu_percent(interval=1) # 1์ด๋ง๋ค CPU ์ธก์
timestamp = time.time()
writer.writerow([timestamp, cpu])
print(f"[pub-cpu] {timestamp:.2f}, {cpu:.2f}%")
f.flush()
except KeyboardInterrupt:
print("[pub-cpu] Logging stopped.")
2๏ธโฃ sub_cpu_logger.py (์๋ธ์คํฌ๋ผ์ด๋ฒ ์ธก CPU ๋ก๊ฑฐ)
import psutil
import time
import csv
log_path = "/app/sub_cpu.csv"
with open(log_path, mode="w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "cpu_percent"])
while True:
cpu = psutil.cpu_percent(interval=1)
writer.writerow([time.time(), cpu])
f.flush()
์ฐจ์ด์ : sub_cpu_logger.py๋ ๊ฐ๋จํ ๊ตฌ์ฑ๋์ด ์์ง๋ง ๊ธฐ๋ฅ์ ๋์ผ
๐ ๊ฒฐ๊ณผ ํ์ผ ๊ตฌ์กฐ
ํ์ผ๋ช | ๋ด์ฉ |
pub_cpu.csv | ํผ๋ธ๋ฆฌ์ ์ธก ์๊ฐ๋๋ณ CPU ์ฌ์ฉ๋ฅ |
sub_cpu.csv | ์๋ธ์คํฌ๋ผ์ด๋ฒ ์ธก ์๊ฐ๋๋ณ CPU ์ฌ์ฉ๋ฅ |
timestamp,cpu_percent
1712320764.89,4.3
1712320765.89,5.1
...
๐ก ์คํ ๋ฐฉ๋ฒ (์๋ก ๋ค๋ฅธ ํฐ๋ฏธ๋์์ ๋ณ๋ ฌ ์คํ)
ํผ๋ธ๋ฆฌ์ ์ธก:
python3 pub_cpu_logger.py
์๋ธ์คํฌ๋ผ์ด๋ฒ ์ธก:
python3 sub_cpu_logger.py
์คํ์ด ๋๋ฌ์ผ๋ฉด Ctrl+C๋ก ์ข
๋ฃ ๊ฐ๋ฅ
(๋ฐ์ดํฐ๋ ์ค์๊ฐ์ผ๋ก ์ ์ฅ๋๋ฉฐ, flush ์ฌ์ฉ๋จ)