Skip to content
Snippets Groups Projects
Verified Commit 0a1841f6 authored by TheJoeCoder's avatar TheJoeCoder
Browse files

Start doing sequences

parent fd5abbe6
No related branches found
No related tags found
No related merge requests found
...@@ -6,7 +6,7 @@ import queue ...@@ -6,7 +6,7 @@ import queue
import base64 import base64
import pygame import pygame
from flask import request, Flask, Response, Request from flask import request, Flask
import ledpatterns import ledpatterns
from absled import ILedString from absled import ILedString
...@@ -14,6 +14,8 @@ import screenwidgets ...@@ -14,6 +14,8 @@ import screenwidgets
from config import PROP_AVAILABLE_WIDGETS, PROP_AVAILABLE_PATTERNS, PROP_WINDOW_SCALE, PROP_WINDOW_FULLSCREEN, \ from config import PROP_AVAILABLE_WIDGETS, PROP_AVAILABLE_PATTERNS, PROP_WINDOW_SCALE, PROP_WINDOW_FULLSCREEN, \
PROP_SCREEN_WIDTH, PROP_SCREEN_HEIGHT, PROP_HIDE_MOUSE, PROP_LEDS_DRIVER, PROP_LEDS_LENGTH, PROP_LEDS_LOCATION, \ PROP_SCREEN_WIDTH, PROP_SCREEN_HEIGHT, PROP_HIDE_MOUSE, PROP_LEDS_DRIVER, PROP_LEDS_LENGTH, PROP_LEDS_LOCATION, \
PROP_FLASK_DEBUG, PROP_FLASK_HOST, PROP_FLASK_PORT, hw_config PROP_FLASK_DEBUG, PROP_FLASK_HOST, PROP_FLASK_PORT, hw_config
from utils.jsonencoding import get_cfg_from_request, decode_dict
from utils.net import json_response, verify_json_request
app = Flask(__name__) app = Flask(__name__)
...@@ -64,30 +66,17 @@ def load_sequences(): ...@@ -64,30 +66,17 @@ def load_sequences():
widget_sequence = seq.get("widgets", widget_sequence) widget_sequence = seq.get("widgets", widget_sequence)
pattern_sequence = seq.get("patterns", pattern_sequence) pattern_sequence = seq.get("patterns", pattern_sequence)
def save_sequences():
with open("sequence_config.json", "w") as f:
json.dump({
"widgets": widget_sequence,
"patterns": pattern_sequence
}, f, indent=4)
mainthread_queue = queue.Queue() mainthread_queue = queue.Queue()
display_queue = queue.Queue() display_queue = queue.Queue()
led_queue = queue.Queue() led_queue = queue.Queue()
def json_response(data, status=200):
return Response(json.dumps(data), status=status, mimetype="application/json")
def verify_json_request(req: Request, required_fields=None):
if required_fields is None:
required_fields = []
if not req.is_json:
return False, json_response({
"error": "Request is not JSON. Please make sure mimetype is set to application/json."
}, 400)
for field in required_fields:
if field not in req.json:
return False, json_response({
"error": f"Missing field {field}"
}, 400)
return True, None
############## ##############
# Web Routes # # Web Routes #
...@@ -133,13 +122,10 @@ def wr_set_widget(): ...@@ -133,13 +122,10 @@ def wr_set_widget():
"error": f"Widget {widget_name} is not available" "error": f"Widget {widget_name} is not available"
}, 400) }, 400)
config_override = request.json.get("config", {}) # Get the config (or return error)
if type(config_override) is not dict: config_b64, config_error = get_cfg_from_request(request)
return json_response({ if config_error is not None:
"error": "Config must be an Object" return config_error
}, 400)
config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
# Send the pattern to the LED thread # Send the pattern to the LED thread
display_queue.put("SET_WIDGET:" + widget_name + ":" + config_b64) display_queue.put("SET_WIDGET:" + widget_name + ":" + config_b64)
...@@ -169,13 +155,10 @@ def wr_set_pattern(): ...@@ -169,13 +155,10 @@ def wr_set_pattern():
"error": f"Pattern {pattern_name} is not available" "error": f"Pattern {pattern_name} is not available"
}, 400) }, 400)
config_override = request.json.get("config", {}) # Get the config (or return error)
if type(config_override) is not dict: config_b64, config_response = get_cfg_from_request(request)
return json_response({ if config_response is not None:
"error": "Config must be an Object" return config_response
}, 400)
config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
# Send the pattern to the LED thread # Send the pattern to the LED thread
led_queue.put("SET_PATTERN:" + pattern_name + ":" + config_b64) led_queue.put("SET_PATTERN:" + pattern_name + ":" + config_b64)
...@@ -377,10 +360,28 @@ if __name__ == "__main__": ...@@ -377,10 +360,28 @@ if __name__ == "__main__":
while True: while True:
try: try:
message = mainthread_queue.get_nowait() message = mainthread_queue.get_nowait()
parts = message.split(":")
if message == "EXIT": if message == "EXIT":
display_queue.put("EXIT") display_queue.put("EXIT")
led_queue.put("EXIT") led_queue.put("EXIT")
break break
elif parts[0] == "UPDATE_ALL_MODULE_CFG" and len(parts) == 2:
module_config = decode_dict(parts[1])
save_module_config()
elif parts[0] == "SAVE_WIDGET_SEQUENCE_CFG" and len(parts) == 2:
widget_sequence = decode_dict(parts[1])
with open("sequence_config.json", "w") as f:
json.dump({
"widgets": widget_sequence,
"patterns": pattern_sequence
}, f, indent=4)
elif parts[0] == "SAVE_PATTERN_SEQUENCE_CFG" and len(parts) == 2:
pattern_sequence = decode_dict(parts[1])
with open("sequence_config.json", "w") as f:
json.dump({
"widgets": widget_sequence,
"patterns": pattern_sequence
}, f, indent=4)
except queue.Empty: except queue.Empty:
pass pass
# Keep main thread alive # Keep main thread alive
......
import base64
import json
from main import json_response
def encode_dict(d: dict|list) -> str:
return base64.b64encode(json.dumps(d).encode("utf-8")).decode("utf-8")
def decode_dict(s: str) -> dict|list:
return json.loads(base64.b64decode(s.encode("utf-8")).decode("utf-8"))
def get_cfg_from_request(request):
config_override = request.json.get("config", {})
if type(config_override) is not dict:
return None, json_response({
"error": "Config must be an Object"
}, 400)
config_b64 = encode_dict(config_override)
return config_b64, None
\ No newline at end of file
import json
import socket import socket
from typing import Any
from flask import Response, Request
def get_ip():
def get_ip() -> str:
"""
Gets the IP address of the current machine
:return: the IP address as a string
"""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0) s.settimeout(0)
try: try:
...@@ -13,3 +21,37 @@ def get_ip(): ...@@ -13,3 +21,37 @@ def get_ip():
finally: finally:
s.close() s.close()
return ip_addr return ip_addr
def json_response(data: Any, status=200) -> Response:
"""
Generate a JSON response from any json-serializable data
:param data: The data to be sent
:param status: The status code of the response
:return: A JSON response
"""
return Response(json.dumps(data), status=status, mimetype="application/json")
def verify_json_request(req: Request, required_fields: list|None = None) -> tuple[bool, Response|None]:
"""
Verify that a request is JSON and contains all required fields
:param req: The request to verify
:param required_fields: A list of required fields, or None
:return: A tuple containing a boolean indicating if the request is valid and a response if it is not
"""
if required_fields is None:
required_fields = []
if not req.is_json:
return False, json_response({
"error": "Request is not JSON. Please make sure mimetype is set to application/json."
}, 400)
for field in required_fields:
if field not in req.json:
return False, json_response({
"error": f"Missing field {field}"
}, 400)
return True, None
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment