import serial import time import re from colorama import Fore, Style SERIAL_PORT = "/dev/ttyS0" BAUDRATE = 9600 variables = { 'emqx_host': "v92023ed.ala.eu-central-1.emqxsl.com", 'emqx_port': 8883, 'mqtt_client_id': "rpi_sim7080_test", 'ca_local_path': "/home/sigma/emqxsl-ca.crt", 'ca_filename': 'ca.pem', 'ntp_server': '0.cz.pool.ntp.org', 'pdpcid': 0 } def send_at(cmd, timeout=2): print(f"{Fore.MAGENTA} {cmd}{Style.RESET_ALL}", end="") ser.write((cmd + "\r\n").encode()) time.sleep(timeout) response = ser.read_all().decode().strip().replace("\n\n", "\n") if "OK" == response: print(f" {Fore.GREEN}{response}{Style.RESET_ALL}") return True elif "ERROR" in response: print(f"\n {Fore.RED}{response}{Style.RESET_ALL}") return False elif response == "": print() else: print(f"\n {Fore.CYAN}{response}{Style.RESET_ALL}") return False def wait_for_ready(): while True: line = ser.readline().decode() print(line.strip()) break def upload_ca_cert(): ca_local_path = variables.get('ca_local_path') with open(ca_local_path, "rb") as f: data = f.read() cert_len = len(data) ca_filename = variables.get('ca_filename') send_at(f'AT+CFSWFILE=3,"{ca_filename}",0,{cert_len},10000', "DOWNLOAD") time.sleep(1) print(ser.read_all().decode()) ser.write(data) def replace_vars(text): if isinstance(text, bytes): text = text.decode() def replace_var(match): var_name = match.group(1) return variables.get(var_name, match.group(0)) return re.sub(r'\$(.*?)\$', replace_var, text) ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) time.sleep(2) ser.reset_input_buffer() wait_for_ready() with open("theone.at", "rb") as f: data = f.readlines() for raw_line in data: line = raw_line.decode("utf-8").strip() if line.startswith("#"): print(Style.BRIGHT + line[1:] + Style.RESET_ALL) elif line.startswith("s"): match = re.search(r'\d+', line) if match: sleep_time = int(match.group()) print(f"Sleeping for {sleep_time} seconds...") time.sleep(sleep_time) elif line == "cacert": upload_ca_cert() else: send_at(replace_vars(line,))