#!/usr/bin/env python3
"""
Meshtastic Bot - odpowiada na komendy wysyłane przez radio LoRa
Połączenie TCP do węzła na adresie 10.10.3.61
"""
import meshtastic
import meshtastic.tcp_interface
from pubsub import pub
import time
import logging
import os
# Konfiguracja logowania
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class MeshtasticBot:
def __init__(self, hostname=None, port=None):
"""
Inicjalizacja bota
hostname: adres IP węzła Meshtastic (domyślnie z MESHTASTIC_HOST lub '10.10.3.61')
port: port TCP (domyślnie z MESHTASTIC_PORT lub 4403)
"""
self.interface = None
self.hostname = hostname or os.getenv('MESHTASTIC_HOST', '10.10.3.61')
self.port = int(port or os.getenv('MESHTASTIC_PORT', '4403'))
self.commands = {
'!help': self.cmd_help,
'!ping': self.cmd_ping,
'!info': self.cmd_info,
'!uptime': self.cmd_uptime,
'!nodes': self.cmd_nodes,
'!signal': self.cmd_signal
}
self.start_time = time.time()
def cmd_help(self, packet, interface):
"""Wyświetla dostępne komendy"""
help_text = "Dostępne komendy:\n"
help_text += "!help - lista komend\n"
help_text += "!ping - test połączenia\n"
help_text += "!info - info o bocie\n"
help_text += "!uptime - czas działania\n"
help_text += "!nodes - liczba węzłów\n"
help_text += "!signal - poziom sygnału"
return help_text
def cmd_ping(self, packet, interface):
"""Odpowiada pong"""
return "Pong! 🏓"
def cmd_info(self, packet, interface):
"""Informacje o bocie"""
node_id = interface.myInfo.my_node_num if interface.myInfo else "unknown"
return f"Meshtastic Bot v1.0\nNode: {node_id}\nIP: {self.hostname}"
def cmd_uptime(self, packet, interface):
"""Czas działania bota"""
uptime_seconds = int(time.time() - self.start_time)
hours = uptime_seconds // 3600
minutes = (uptime_seconds % 3600) // 60
return f"Uptime: {hours}h {minutes}m"
def cmd_nodes(self, packet, interface):
"""Liczba węzłów w sieci"""
node_count = len(interface.nodes) if interface.nodes else 0
return f"Węzłów w sieci: {node_count}"
def cmd_signal(self, packet, interface):
"""Poziom sygnału ostatniego pakietu"""
rssi = packet.get('rxRssi', '???')
snr = packet.get('rxSnr', '???')
return f"Poziom sygnału:\nRSSI: {rssi} dBm\nSNR: {snr} dB"
def on_receive(self, packet, interface):
"""Callback wywoływany przy odbiorze pakietu"""
try:
# Sprawdź czy to wiadomość tekstowa
if 'decoded' in packet and 'text' in packet['decoded']:
text = packet['decoded']['text']
sender_id = packet.get('fromId', 'unknown')
logger.info(f"Otrzymano wiadomość od {sender_id}: {text}")
# Sprawdź czy wiadomość zawiera komendę
# Usuń spacje po ! (np. "! help" -> "!help")
normalized_text = text.strip().replace('! ', '!')
command = normalized_text.lower().split()[0]
if command in self.commands:
logger.info(f"Wykonuję komendę: {command}")
response = self.commands[command](packet, interface)
# Wyślij odpowiedź do nadawcy
interface.sendText(response, destinationId=sender_id)
logger.info(f"Wysłano odpowiedź do {sender_id}")
except Exception as e:
logger.error(f"Błąd przetwarzania wiadomości: {e}")
def on_connection(self, interface, topic=pub.AUTO_TOPIC):
"""Callback wywoływany przy połączeniu z urządzeniem"""
logger.info("Połączono z urządzeniem Meshtastic")
logger.info(f"Node ID: {interface.myInfo.my_node_num if interface.myInfo else 'unknown'}")
def start(self):
"""Uruchom bota"""
try:
# Subskrybuj zdarzenia
pub.subscribe(self.on_receive, "meshtastic.receive.text")
pub.subscribe(self.on_connection, "meshtastic.connection.established")
# Połącz z urządzeniem przez TCP
logger.info(f"Łączenie z węzłem Meshtastic {self.hostname}:{self.port}...")
self.interface = meshtastic.tcp_interface.TCPInterface(
hostname=self.hostname,
portNumber=self.port
)
logger.info("Bot uruchomiony. Naciśnij Ctrl+C aby zakończyć.")
# Pętla główna
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("\nZatrzymywanie bota...")
except ConnectionRefusedError:
logger.error(f"Nie można połączyć się z {self.hostname}:{self.port}")
logger.error("Sprawdź czy węzeł jest włączony i dostępny w sieci")
except Exception as e:
logger.error(f"Błąd: {e}")
finally:
if self.interface:
self.interface.close()
logger.info("Bot zatrzymany.")
if __name__ == "__main__":
# Uruchom bota - hostname i port pobrane ze zmiennych środowiskowych
# MESHTASTIC_HOST i MESHTASTIC_PORT lub domyślne wartości
bot = MeshtasticBot()
bot.start()