Select Git revision
main.py 11.42 KiB
import json
import os
import threading
import time
import queue
import base64
import pygame
from flask import request, Flask, Response, Request
import ledpatterns
from absled import ILedString
import screenwidgets
from config import PROP_AVAILABLE_WIDGETS, PROP_AVAILABLE_PATTERNS, PROP_WINDOW_SCALE, PROP_WINDOW_FULLSCREEN, \
PROP_SCREEN_WIDTH, PROP_SCREEN_HEIGHT, PROP_HIDE_MOUSE, PROP_LEDS_DRIVER, PROP_LEDS_LENGTH, PROP_LEDS_LOCATION, \
PROP_FLASK_DEBUG, PROP_FLASK_HOST, PROP_FLASK_PORT, hw_config
app = Flask(__name__)
module_config = {}
widget_sequence = []
pattern_sequence = []
def load_module_config():
global module_config
# Load defaults
module_defaults = json.load(open("module_defaults.json"))
module_config = {}
for module_name, module_conf in module_defaults.items():
add_conf = {}
for setting, props in module_conf.items():
add_conf[setting] = props.get("default", None)
module_config[module_name] = add_conf
# Load config over defaults
if os.path.exists("module_config.json"):
mconf = json.load(open("module_config.json"))
for module_name, module_conf in mconf.items():
if module_name in module_config:
for setting, value in module_conf.items():
if setting in module_config[module_name]:
module_config[module_name][setting] = value
# Save config
with open("module_config.json", "w") as f:
json.dump(module_config, f, indent=4)
def save_module_config():
with open("module_config.json", "w") as f:
json.dump(module_config, f, indent=4)
def load_sequences():
global widget_sequence, pattern_sequence
with open("sequence_default.json") as f:
seq = json.load(f)
widget_sequence = seq.get("widgets", [])
pattern_sequence = seq.get("patterns", [])
if os.path.exists("sequence_config.json"):
with open("sequence_config.json") as f:
seq = json.load(f)
widget_sequence = seq.get("widgets", widget_sequence)
pattern_sequence = seq.get("patterns", pattern_sequence)
mainthread_queue = queue.Queue()
display_queue = queue.Queue()
led_queue = queue.Queue()
def json_response(data, status=200):
return Response(json.dumps(data), status=status, mimetype="application/json")
def verify_json_request(req: Request, required_fields=None):
if required_fields is None:
required_fields = []
if not req.is_json:
return False, json_response({
"error": "Request is not JSON. Please make sure mimetype is set to application/json."
}, 400)
for field in required_fields:
if field not in req.json:
return False, json_response({
"error": f"Missing field {field}"
}, 400)
return True, None
##############
# Web Routes #
##############
@app.route("/")
def wr_index():
return f"""
<h1>LED Clock</h1>
Available widgets: {PROP_AVAILABLE_WIDGETS}<br>
Available patterns: {PROP_AVAILABLE_PATTERNS}<br>
<a href="/api/get_config">Config</a>
"""
@app.route("/api/available_fonts")
def wr_available_fonts():
return json_response(pygame.font.get_fonts())
@app.route("/api/available_widgets")
def wr_available_widgets():
return json_response(PROP_AVAILABLE_WIDGETS)
@app.route("/api/available_patterns")
def wr_available_patterns():
return json_response(PROP_AVAILABLE_PATTERNS)
@app.route("/api/set_widget", methods=["POST"])
def wr_set_widget():
# Make sure the request is valid
valid, validation_response = verify_json_request(request, ["widget"])
if not valid:
return validation_response
# Verify pattern is valid and available
widget_name = request.json.get("widget")
if widget_name is None:
return json_response({
"error": "No widget provided"
}, 400)
if widget_name not in PROP_AVAILABLE_WIDGETS:
return json_response({
"error": f"Widget {widget_name} is not available"
}, 400)
config_override = request.json.get("config", {})
if type(config_override) is not dict:
return json_response({
"error": "Config must be an Object"
}, 400)
config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
# Send the pattern to the LED thread
display_queue.put("SET_WIDGET:" + widget_name + ":" + config_b64)
# Return success
return json_response({
"status": "OK",
"pattern": widget_name
})
@app.route("/api/set_pattern", methods=["POST"])
def wr_set_pattern():
# Make sure the request is valid
valid, validation_response = verify_json_request(request, ["pattern"])
if not valid:
return validation_response
# Verify pattern is valid and available
pattern_name = request.json.get("pattern")
if pattern_name is None:
return json_response({
"error": "No pattern provided"
}, 400)
if pattern_name not in PROP_AVAILABLE_PATTERNS:
return json_response({
"error": f"Pattern {pattern_name} is not available"
}, 400)
config_override = request.json.get("config", {})
if type(config_override) is not dict:
return json_response({
"error": "Config must be an Object"
}, 400)
config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
# Send the pattern to the LED thread
led_queue.put("SET_PATTERN:" + pattern_name + ":" + config_b64)
# Return success
return json_response({
"status": "OK",
"pattern": pattern_name
})
@app.route("/api/get_config")
def wr_get_config():
return json_response(module_config)
#####################
# Loading functions #
#####################
def load_widget(widget_name: str, config_override: dict=None):
# Get the widget properties from the config file
widget_props = module_config.get(widget_name, {}).copy()
if config_override is not None:
widget_props.update(config_override)
# Scale the properties that are supposed to be scalable
for k, v in widget_props.copy().items():
if (isinstance(v, int) or isinstance(v, float)) and k.startswith("scalable_"):
widget_props[k.removeprefix("scalable_")] = v * PROP_WINDOW_SCALE
# Add the pygame module to the widget properties
widget_props["pygame"] = pygame
# Load the widget
return getattr(screenwidgets, widget_name)(widget_props)
def load_pattern(pattern_name: str, leds: ILedString, config_override: dict=None):
pattern_props = module_config.get(pattern_name, {}).copy()
if config_override is not None:
pattern_props.update(config_override)
return getattr(ledpatterns, pattern_name)(pattern_props, leds)
def run_screen():
# Initialise Pygame and create a window
pygame.init()
pg_options = 0x0
if PROP_WINDOW_FULLSCREEN:
pg_options |= pygame.FULLSCREEN
screen = pygame.display.set_mode(
(PROP_SCREEN_WIDTH * PROP_WINDOW_SCALE, PROP_SCREEN_HEIGHT * PROP_WINDOW_SCALE),
pg_options
)
pygame.display.set_caption("Clock")
pygame.mouse.set_visible(not PROP_HIDE_MOUSE)
# Create a clock for FPS counter and timing
clock = pygame.time.Clock()
# Init widgets
widget_to_load = PROP_AVAILABLE_WIDGETS[0] # TODO change to load multiple widgets
current_widget = load_widget(widget_to_load)
# Main loop
running = True
while running:
for event in pygame.event.get():
if event.type == pygame.QUIT:
running = False
# Clear the screen
screen.fill((0, 0, 0))
# Check for messages
try:
message = display_queue.get_nowait()
parts = message.split(":")
if parts[0] == "SET_WIDGET":
widget_to_load = parts[1]
if widget_to_load in PROP_AVAILABLE_WIDGETS: # Check input validity
config_override = {}
if len(parts) == 3:
try:
config_override = json.loads(base64.b64decode(parts[2]).decode("utf-8"))
except json.JSONDecodeError:
pass
current_widget = load_widget(widget_to_load, config_override)
except queue.Empty:
pass
# Draw stuff here
current_widget.draw(screen)
# Display FPS counter
if hw_config["show_fps"]:
fps = clock.get_fps()
font = pygame.font.Font(None, 36)
text = font.render(f"FPS: {fps:.2f}", True, (255, 255, 255))
screen.blit(text, (10, 10))
# Tick the clock
if hw_config["fps_max"] > 0:
clock.tick(hw_config["fps_max"])
else:
clock.tick()
# Update the screen
pygame.display.flip()
# Send exit message to main thread
mainthread_queue.put("EXIT")
# Quit Pygame
pygame.quit()
def run_leds():
# Load LEDs
if PROP_LEDS_DRIVER == "dummy":
from absled.dummy import DummyLedString
leds = DummyLedString("", PROP_LEDS_LENGTH)
elif PROP_LEDS_DRIVER == "npixel":
from absled.npixel import NeopixelLedString
leds = NeopixelLedString(PROP_LEDS_LOCATION, PROP_LEDS_LENGTH)
else:
raise ValueError("Invalid LED driver")
current_pattern = load_pattern(PROP_AVAILABLE_PATTERNS[0], leds)
pattern_timer = module_config.get(PROP_AVAILABLE_PATTERNS[0], {}).get("tick_rate", -1)
current_pattern.tick()
while True:
try:
message = led_queue.get_nowait()
parts = message.split(":")
if parts[0] == "SET_PATTERN":
pattern_name = parts[1]
if pattern_name in PROP_AVAILABLE_PATTERNS:
config_override = {}
if len(parts) == 3:
try:
config_override = json.loads(base64.b64decode(parts[2]).decode("utf-8"))
except json.JSONDecodeError:
pass
current_pattern = load_pattern(pattern_name, leds, config_override)
pattern_timer = module_config.get(pattern_name, {}).get("tick_rate", -1)
current_pattern.tick()
elif message == "EXIT":
break
except queue.Empty:
pass
if pattern_timer >= 0:
time.sleep(pattern_timer / 1000)
current_pattern.tick()
if __name__ == "__main__":
# Load configuration
load_module_config()
# Fork off pygame
pygame_thread = threading.Thread(target=run_screen, daemon=True)
pygame_thread.start()
# Fork off flask server
flask_thread = threading.Thread(
target=lambda: app.run(debug=PROP_FLASK_DEBUG, host=PROP_FLASK_HOST, port=PROP_FLASK_PORT, use_reloader=False),
daemon=True)
flask_thread.start()
# Fork off LED thread
led_thread = threading.Thread(target=run_leds, daemon=True)
led_thread.start()
# Wait for the threads to finish
try:
while True:
try:
message = mainthread_queue.get_nowait()
if message == "EXIT":
display_queue.put("EXIT")
led_queue.put("EXIT")
break
except queue.Empty:
pass
# Keep main thread alive
time.sleep(1)
except KeyboardInterrupt:
exit()
exit()