From 071fb123fe229846668bfd0c2e1f42e62fa78f20 Mon Sep 17 00:00:00 2001 From: sigma Date: Tue, 24 Jun 2025 12:50:09 +0100 Subject: [PATCH] first commit --- sigma.py | 170 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ theone.at | 36 ++++++++++++ theone.py | 84 +++++++++++++++++++++++++++ 3 files changed, 290 insertions(+) create mode 100644 sigma.py create mode 100644 theone.at create mode 100644 theone.py diff --git a/sigma.py b/sigma.py new file mode 100644 index 0000000..aed468f --- /dev/null +++ b/sigma.py @@ -0,0 +1,170 @@ +import serial +import time + +# === CONFIG === +SERIAL_PORT = "/dev/ttyS0" +BAUDRATE = 9600 +EMQX_HOST = "v92023ed.ala.eu-central-1.emqxsl.com" +EMQX_PORT = 8883 +CA_FILENAME = "ca.crt" +CA_LOCAL_PATH = "/home/sigma/emqxsl-ca.crt" +MQTT_TOPIC = "test/topic" +MQTT_CLIENT_ID = "rpi_sim7080_test" +#MQTT_MSG = "Hello over secure MQTT" +MQTT_QOS = 1 +SSL_CTX_ID = 1 +MQTT_CONN_ID = 0 + +NTP_SERVER = "0.cz.pool.ntp.org" + +# additional utilities +def send_at(cmd, wait_for="OK", timeout=2): + print(f"> {cmd}") + ser.write((cmd + "\r\n").encode()) + time.sleep(timeout) + response = ser.read_all().decode() + print(response) + return wait_for in response + +def wait_for_ready(): + while True: + line = ser.readline().decode() + print(line.strip()) + break + + +# init serial comunication +ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) +time.sleep(2) +ser.reset_input_buffer() +wait_for_ready() + + +# OLD Chat-GPT + edits code + +# === 1. Basic Setup === +#send_at("ATE0") +#send_at("AT+CPIN?") # Check SIM +#send_at("AT+CSQ") # Signal strength +#send_at("AT+CEREG=1") +#send_at("AT+CGDCONT=1,\"IP\",\"m2m.public.cz\"") +#send_at("AT+CGACT=1,1") +#send_at("AT+CGPADDR=1") +#send_at("AT+CFSINIT") +#send_at("AT+CNACT=0,1") +# +## === 2. Upload CA Certificate === +#def upload_cert(): +# with open(CA_LOCAL_PATH, "rb") as f: +# data = f.read() +# cert_len = len(data) +# +# # Allocate file +# send_at(f'AT+CFSWFILE=0,"{CA_FILENAME}",0,{cert_len},10000', "DOWNLOAD") +# ser.write(data) +# time.sleep(1) +# print(ser.read_all().decode()) +# +#upload_cert() +# +## === 3. Configure SSL Context === +#send_at(f"AT+CSSLCFG=\"sslversion\",1,3") # TLS 1.2 +#send_at(f"AT+CASSLCFG=1,\"cacert\",\"{CA_FILENAME}\"") +## send_at(f"AT+CSSLCFG=\"\",{SSL_CTX_ID},1") # Server auth only +#send_at(f"AT+CSSLCFG=\"ignorertctime\",1,1") +#send_at(f"AT+CSSLCFG=\"sni\",1,1") +# +## === 4. Configure MQTT === +#send_at(f"AT+SMCONF=\"URL\",\"{EMQX_HOST}\",{EMQX_PORT}") +#send_at(f"AT+SMCONF=\"KEEPTIME\",60") +#send_at(f"AT+SMCONF=\"CLIENTID\",\"{MQTT_CLIENT_ID}\"") +#send_at(f"AT+SMSSL=1,\"{CA_FILENAME}\",\"{CA_FILENAME}\"") +#send_at("AT+SMCONF=\"CLENASS\",1") +# +## === 5. Connect to MQTT Broker === +#if not send_at("AT+SMCONN", "OK", timeout=10): +# print("MQTT connection failed") +# exit() +# +## === 6. Publish Message === +#msg_len = len(MQTT_MSG) +#send_at(f"AT+SMPUB=\"{MQTT_TOPIC}\",{msg_len},{MQTT_QOS}", ">", timeout=1) +#ser.write(MQTT_MSG.encode() + b"\x1A") # End with Ctrl+Z +#time.sleep(2) +#print(ser.read_all().decode()) + + +# pin init +send_at("ATE0") +send_at("AT") +# reboot and wait +send_at("AT+CREBOOT") +print("rebooting... please wait") +failed = True +for i in range(1,10): + if send_at("ATE0", "OK", timeout=3): + failed = False + break + send_at("AT") +if failed: + print("failed to reboot") + exit() +# set as verbose +send_at("AT+CMEE=2") +# sim and rf status check +send_at("AT+CPIN?") +send_at("AT+CSQ") +# manual APN conf +send_at("AT+CFUN=0") +send_at("AT+CGDCONT=1,\"IP\",\"m2m.public.cz\"") +send_at("AT+CFUN=1") +send_at("AT+CGATT?") +send_at("AT+CGNAPN") +send_at("AT+CNCFG=0,1,\"m2m.public.cz\"") +# APN check +send_at("AT+CGNAPN") +# activate apn/pdp +send_at("AT+CNACT=0,1") +send_at("AT+CNACT?") +# synch UTC +send_at(f'AT+CNTP="{NTP_SERVER}",2,0,2') +time.sleep(12) +send_at("AT+CNTP") +send_at("AT+CCLK?") +# file system init +send_at("AT+CFSINIT") +# upload ca certificate +with open(CA_LOCAL_PATH, "rb") as f: + data = f.read() +cert_len = len(data) +send_at(f'AT+CFSWFILE=1,"{CA_FILENAME}",0,{cert_len},10000', "DOWNLOAD") +time.sleep(1) +print(ser.read_all().decode()) +ser.write(data) +send_at("AT+CFSTERM") +# setup mqtt with ssl +send_at(f'AT+SMCONF="URL","{EMQX_HOST}","{EMQX_PORT}"') +send_at("AT+SMCONF=\"KEEPTIME\",60") +send_at("AT+SMCONF=\"CLEANSS\",1") +send_at(f'AT+SMCONF="CLIENTID","{MQTT_CLIENT_ID}"') +send_at('AT+SMCONF="QOS",1') +send_at("AT+SMCONF?") +# set tls version to 1.2, ignore rtc time and set cipher. then check it +send_at('AT+CSSLCFG="sslversion",1,3') +send_at('AT+CSSLCFG="IGNORERTCTIME",1,1') +send_at('AT+CSSLCFG=“CIPHERSUITE”,1,0,0x0035') +send_at('AT+CSSLCFG=“CIPHERSUITE”,1,1,0x002F') +send_at("AT+CSSLCFG?") +time.sleep(10) +# convert and use uploaded certificate +send_at(f'AT+CSSLCFG="CONVERT",2,"{CA_FILENAME}"') +send_at(f'AT+SMSSL=2,"{CA_FILENAME}",""') # no idea how to CHOOSE the certificte +send_at("AT+SMSSL?") +# try to connect +if not send_at("AT+SMCONN", "OK", timeout=20): + print(" - FAILED to connect") +# disconnect/clean-up +send_at("AT+SMDISC") +send_at("AT+CNACT=0,0") + +ser.close() diff --git a/theone.at b/theone.at new file mode 100644 index 0000000..6ae4532 --- /dev/null +++ b/theone.at @@ -0,0 +1,36 @@ +#reboot +ATE0 +AT +AT+CREBOOT +ATE0 +AT +ATE0 +#set verbose +AT+CMEE=2 +#sim and rf checks +AT+CPIN? +AT+CSQ +#fs init +AT+CFSINIT +#upload ca cert +cacert +#convert ca cert and end uploading +AT+CSSLCFG="CONVERT",2,ca.pem +AT+CFSTERM +#manual APN conf +AT+CFUN=0 +AT+CGDCONT=$pdpcid$,"IP",$apn_server$ +AT+CFUN=1 +AT+CGATT? +AT+CGNAPN +AT+CNCFG=$pdpcid$,1,$apn_server$ +#APN check +AT+CGNAPN +#activate APN/PDP +AT+CNACT=0,1 +AT+CNACT? +#synch UTC +AT+CNTP=$ntp_server$,2,0,2 +s 15 +AT+CNTP +AT+CCLK? diff --git a/theone.py b/theone.py new file mode 100644 index 0000000..c986c8a --- /dev/null +++ b/theone.py @@ -0,0 +1,84 @@ +import serial +import time +import re +from colorama import Fore, Style + +SERIAL_PORT = "/dev/ttyS0" +BAUDRATE = 9600 + +variables = { + 'emqx_host': "v92023ed.ala.eu-central-1.emqxsl.com", + 'emqx_port': 8883, + 'mqtt_client_id': "rpi_sim7080_test", + 'ca_local_path': "/home/sigma/emqxsl-ca.crt", + 'ca_filename': 'ca.pem', + 'ntp_server': '0.cz.pool.ntp.org', + 'pdpcid': 0 +} + +def send_at(cmd, timeout=2): + print(f"{Fore.MAGENTA} {cmd}{Style.RESET_ALL}", end="") + + ser.write((cmd + "\r\n").encode()) + time.sleep(timeout) + response = ser.read_all().decode().strip().replace("\n\n", "\n") + + if "OK" == response: + print(f" {Fore.GREEN}{response}{Style.RESET_ALL}") + return True + elif "ERROR" in response: + print(f"\n {Fore.RED}{response}{Style.RESET_ALL}") + return False + elif response == "": + print() + else: + print(f"\n {Fore.CYAN}{response}{Style.RESET_ALL}") + return False + +def wait_for_ready(): + while True: + line = ser.readline().decode() + print(line.strip()) + break + +def upload_ca_cert(): + ca_local_path = variables.get('ca_local_path') + with open(ca_local_path, "rb") as f: + data = f.read() + cert_len = len(data) + ca_filename = variables.get('ca_filename') + send_at(f'AT+CFSWFILE=3,"{ca_filename}",0,{cert_len},10000', "DOWNLOAD") + time.sleep(1) + print(ser.read_all().decode()) + ser.write(data) + +def replace_vars(text): + if isinstance(text, bytes): + text = text.decode() + def replace_var(match): + var_name = match.group(1) + return variables.get(var_name, match.group(0)) + return re.sub(r'\$(.*?)\$', replace_var, text) + + +ser = serial.Serial(SERIAL_PORT, BAUDRATE, timeout=1) +time.sleep(2) +ser.reset_input_buffer() +wait_for_ready() + +with open("theone.at", "rb") as f: + data = f.readlines() + for raw_line in data: + line = raw_line.decode("utf-8").strip() + if line.startswith("#"): + print(Style.BRIGHT + line[1:] + Style.RESET_ALL) + elif line.startswith("s"): + match = re.search(r'\d+', line) + if match: + sleep_time = int(match.group()) + print(f"Sleeping for {sleep_time} seconds...") + time.sleep(sleep_time) + elif line == "cacert": + upload_ca_cert() + else: + send_at(replace_vars(line,))