Files
tasmota-skill/scripts/tasmota-discovery.py

166 lines
5.5 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Tasmota Device Discovery
Scans network for Tasmota devices by:
1. HTTP-based detection (port 80)
2. Checking for Tasmota JSON API response
3. Looking for Tasmota user-agent in responses
"""
import socket
import subprocess
import concurrent.futures
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
import json
import time
# Network configuration
SUBNET = "192.168.1.0/24"
TIMEOUT = 2 # seconds
def get_local_ip():
"""Get local IP to determine subnet"""
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
s.close()
return ip
except:
return "192.168.1.148"
def get_subnet_ips():
"""Generate list of IPs in subnet"""
local_ip = get_local_ip()
parts = local_ip.split('.')
base = f"{parts[0]}.{parts[1]}.{parts[2]}"
return [f"{base}.{i}" for i in range(1, 255)]
def check_http(ip):
"""Check if IP has HTTP service that might be Tasmota"""
try:
# Try basic HTTP request first
req = Request(f"http://{ip}/", headers={'User-Agent': 'Tasmota-Scanner'})
start = time.time()
with urlopen(req, timeout=TIMEOUT) as response:
elapsed = time.time() - start
data = response.read(5000).decode('utf-8', errors='ignore')
headers = dict(response.headers)
# Check for Tasmota indicators
is_tasmota = False
evidence = []
# Check for Tasmota-specific headers/content
if 'tasmota' in headers.get('Server', '').lower():
is_tasmota = True
evidence.append(f"Server header: {headers.get('Server')}")
# Check content for Tasmota patterns
if 'tasmota' in data.lower():
is_tasmota = True
evidence.append("Tasmota keyword in content")
# Check for known Tasmota page titles/elements
if 'Status' in data and 'tasmota' in data.lower():
is_tasmota = True
evidence.append("Tasmota status page detected")
# Try Tasmota JSON API
try:
req_api = Request(f"http://{ip}/cm?cmnd=status%200")
with urlopen(req_api, timeout=1) as api_response:
api_data = api_response.read(1000).decode('utf-8', errors='ignore')
if 'tasmota' in api_data.lower() or 'StatusSNS' in api_data or 'StatusNET' in api_data:
is_tasmota = True
evidence.append("Tasmota JSON API response")
except:
pass
if is_tasmota:
try:
title = data.split('<title>')[1].split('</title>')[0].strip()
except:
title = "N/A"
return {
'ip': ip,
'port': 80,
'title': title,
'server': headers.get('Server', 'Unknown'),
'evidence': evidence,
'elapsed': elapsed
}
return None
except (URLError, HTTPError, socket.timeout, ConnectionResetError):
return None
except Exception as e:
return None
def ping_scan(ip):
"""Quick ping check to see if host is up"""
try:
result = subprocess.run(
['ping', '-c', '1', '-W', '1', ip],
capture_output=True,
timeout=2
)
return result.returncode == 0
except:
return False
def discover_tasmota():
"""Main discovery function"""
print(f"🔍 Scanning network for Tasmota devices...")
print(f"📍 Local subnet: {SUBNET}")
ips = get_subnet_ips()
print(f"📡 Checking {len(ips)} IP addresses...")
tasmota_devices = []
# First, quick ping sweep to find live hosts
print("\n1⃣ Ping sweep (finding live hosts)...")
live_hosts = []
with concurrent.futures.ThreadPoolExecutor(max_workers=50) as executor:
future_to_ip = {executor.submit(ping_scan, ip): ip for ip in ips}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
if future.result():
live_hosts.append(ip)
print(f"{ip}")
print(f"\n📍 Found {len(live_hosts)} live hosts")
print(f"\n2⃣ Scanning for Tasmota (HTTP) on live hosts...")
# Then check HTTP on live hosts
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
future_to_ip = {executor.submit(check_http, ip): ip for ip in live_hosts}
for future in concurrent.futures.as_completed(future_to_ip):
result = future.result()
if result:
tasmota_devices.append(result)
return tasmota_devices
if __name__ == "__main__":
devices = discover_tasmota()
print(f"\n{'='*60}")
if devices:
print(f"✅ Found {len(devices)} Tasmota device(s):\n")
for i, device in enumerate(devices, 1):
print(f"🔌 Device {i}:")
print(f" IP: {device['ip']}:{device['port']}")
print(f" Title: {device['title']}")
print(f" Server: {device['server']}")
print(f" Evidence:")
for ev in device['evidence']:
print(f"{ev}")
print(f" Response time: {device['elapsed']:.3f}s")
print()
else:
print("❌ No Tasmota devices found")
print(f"{'='*60}")