85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
import serial
|
|
import time
|
|
import re
|
|
from colorama import Fore, Style
|
|
|
|
SERIAL_PORT = "/dev/ttyS0"
|
|
BAUDRATE = 9600
|
|
|
|
variables = {
|
|
'emqx_host': "v92023ed.ala.eu-central-1.emqxsl.com",
|
|
'emqx_port': 8883,
|
|
'mqtt_client_id': "rpi_sim7080_test",
|
|
'ca_local_path': "/home/sigma/emqxsl-ca.crt",
|
|
'ca_filename': 'ca.pem',
|
|
'ntp_server': '0.cz.pool.ntp.org',
|
|
'pdpcid': 0
|
|
}
|
|
|
|
def send_at(cmd, timeout=2):
|
|
print(f"{Fore.MAGENTA} {cmd}{Style.RESET_ALL}", end="")
|
|
|
|
ser.write((cmd + "\r\n").encode())
|
|
time.sleep(timeout)
|
|
response = ser.read_all().decode().strip().replace("\n\n", "\n")
|
|
|
|
if "OK" == response:
|
|
print(f" {Fore.GREEN}{response}{Style.RESET_ALL}")
|
|
return True
|
|
elif "ERROR" in response:
|
|
print(f"\n {Fore.RED}{response}{Style.RESET_ALL}")
|
|
return False
|
|
elif response == "":
|
|
print()
|
|
else:
|
|
print(f"\n {Fore.CYAN}{response}{Style.RESET_ALL}")
|
|
return False
|
|
|
|
def wait_for_ready():
|
|
while True:
|
|
line = ser.readline().decode()
|
|
print(line.strip())
|
|
break
|
|
|
|
def upload_ca_cert():
|
|
ca_local_path = variables.get('ca_local_path')
|
|
with open(ca_local_path, "rb") as f:
|
|
data = f.read()
|
|
cert_len = len(data)
|
|
ca_filename = variables.get('ca_filename')
|
|
send_at(f'AT+CFSWFILE=3,"{ca_filename}",0,{cert_len},10000', "DOWNLOAD")
|
|
time.sleep(1)
|
|
print(ser.read_all().decode())
|
|
ser.write(data)
|
|
|
|
def replace_vars(text):
|
|
if isinstance(text, bytes):
|
|
text = text.decode()
|
|
def replace_var(match):
|
|
var_name = match.group(1)
|
|
return variables.get(var_name, match.group(0))
|
|
return re.sub(r'\$(.*?)\$', replace_var, text)
|
|
|
|
|
|
ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1)
|
|
time.sleep(2)
|
|
ser.reset_input_buffer()
|
|
wait_for_ready()
|
|
|
|
with open("theone.at", "rb") as f:
|
|
data = f.readlines()
|
|
for raw_line in data:
|
|
line = raw_line.decode("utf-8").strip()
|
|
if line.startswith("#"):
|
|
print(Style.BRIGHT + line[1:] + Style.RESET_ALL)
|
|
elif line.startswith("s"):
|
|
match = re.search(r'\d+', line)
|
|
if match:
|
|
sleep_time = int(match.group())
|
|
print(f"Sleeping for {sleep_time} seconds...")
|
|
time.sleep(sleep_time)
|
|
elif line == "cacert":
|
|
upload_ca_cert()
|
|
else:
|
|
send_at(replace_vars(line,))
|