sim7080_at_setup/theone.py
2025-06-24 12:50:09 +01:00

85 lines
2.4 KiB
Python

import serial
import time
import re
from colorama import Fore, Style
SERIAL_PORT = "/dev/ttyS0"
BAUDRATE = 9600
variables = {
'emqx_host': "v92023ed.ala.eu-central-1.emqxsl.com",
'emqx_port': 8883,
'mqtt_client_id': "rpi_sim7080_test",
'ca_local_path': "/home/sigma/emqxsl-ca.crt",
'ca_filename': 'ca.pem',
'ntp_server': '0.cz.pool.ntp.org',
'pdpcid': 0
}
def send_at(cmd, timeout=2):
print(f"{Fore.MAGENTA} {cmd}{Style.RESET_ALL}", end="")
ser.write((cmd + "\r\n").encode())
time.sleep(timeout)
response = ser.read_all().decode().strip().replace("\n\n", "\n")
if "OK" == response:
print(f" {Fore.GREEN}{response}{Style.RESET_ALL}")
return True
elif "ERROR" in response:
print(f"\n {Fore.RED}{response}{Style.RESET_ALL}")
return False
elif response == "":
print()
else:
print(f"\n {Fore.CYAN}{response}{Style.RESET_ALL}")
return False
def wait_for_ready():
while True:
line = ser.readline().decode()
print(line.strip())
break
def upload_ca_cert():
ca_local_path = variables.get('ca_local_path')
with open(ca_local_path, "rb") as f:
data = f.read()
cert_len = len(data)
ca_filename = variables.get('ca_filename')
send_at(f'AT+CFSWFILE=3,"{ca_filename}",0,{cert_len},10000', "DOWNLOAD")
time.sleep(1)
print(ser.read_all().decode())
ser.write(data)
def replace_vars(text):
if isinstance(text, bytes):
text = text.decode()
def replace_var(match):
var_name = match.group(1)
return variables.get(var_name, match.group(0))
return re.sub(r'\$(.*?)\$', replace_var, text)
ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
time.sleep(2)
ser.reset_input_buffer()
wait_for_ready()
with open("theone.at", "rb") as f:
data = f.readlines()
for raw_line in data:
line = raw_line.decode("utf-8").strip()
if line.startswith("#"):
print(Style.BRIGHT + line[1:] + Style.RESET_ALL)
elif line.startswith("s"):
match = re.search(r'\d+', line)
if match:
sleep_time = int(match.group())
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
elif line == "cacert":
upload_ca_cert()
else:
send_at(replace_vars(line,))