Skip to content
Snippets Groups Projects
Select Git revision
  • eaaf548fcdf123b60cea49223592a5958c1716f7
  • master default protected
2 results

main.py

Blame
  • main.py 11.42 KiB
    import json
    import os
    import threading
    import time
    import queue
    import base64
    
    import pygame
    from flask import request, Flask, Response, Request
    
    import ledpatterns
    from absled import ILedString
    import screenwidgets
    from config import PROP_AVAILABLE_WIDGETS, PROP_AVAILABLE_PATTERNS, PROP_WINDOW_SCALE, PROP_WINDOW_FULLSCREEN, \
        PROP_SCREEN_WIDTH, PROP_SCREEN_HEIGHT, PROP_HIDE_MOUSE, PROP_LEDS_DRIVER, PROP_LEDS_LENGTH, PROP_LEDS_LOCATION, \
        PROP_FLASK_DEBUG, PROP_FLASK_HOST, PROP_FLASK_PORT, hw_config
    
    app = Flask(__name__)
    
    
    module_config = {}
    widget_sequence = []
    pattern_sequence = []
    
    def load_module_config():
        global module_config
    
        # Load defaults
        module_defaults = json.load(open("module_defaults.json"))
        module_config = {}
        for module_name, module_conf in module_defaults.items():
            add_conf = {}
            for setting, props in module_conf.items():
                add_conf[setting] = props.get("default", None)
            module_config[module_name] = add_conf
    
        # Load config over defaults
        if os.path.exists("module_config.json"):
            mconf = json.load(open("module_config.json"))
            for module_name, module_conf in mconf.items():
                if module_name in module_config:
                    for setting, value in module_conf.items():
                        if setting in module_config[module_name]:
                            module_config[module_name][setting] = value
    
        # Save config
        with open("module_config.json", "w") as f:
            json.dump(module_config, f, indent=4)
    
    def save_module_config():
        with open("module_config.json", "w") as f:
            json.dump(module_config, f, indent=4)
    
    def load_sequences():
        global widget_sequence, pattern_sequence
        with open("sequence_default.json") as f:
            seq = json.load(f)
            widget_sequence = seq.get("widgets", [])
            pattern_sequence = seq.get("patterns", [])
    
        if os.path.exists("sequence_config.json"):
            with open("sequence_config.json") as f:
                seq = json.load(f)
                widget_sequence = seq.get("widgets", widget_sequence)
                pattern_sequence = seq.get("patterns", pattern_sequence)
    
    mainthread_queue = queue.Queue()
    display_queue = queue.Queue()
    led_queue = queue.Queue()
    
    def json_response(data, status=200):
        return Response(json.dumps(data), status=status, mimetype="application/json")
    
    def verify_json_request(req: Request, required_fields=None):
        if required_fields is None:
            required_fields = []
    
        if not req.is_json:
            return False, json_response({
                "error": "Request is not JSON. Please make sure mimetype is set to application/json."
            }, 400)
    
        for field in required_fields:
            if field not in req.json:
                return False, json_response({
                    "error": f"Missing field {field}"
                }, 400)
    
        return True, None
    
    
    ##############
    # Web Routes #
    ##############
    
    @app.route("/")
    def wr_index():
        return f"""
        <h1>LED Clock</h1>
        Available widgets: {PROP_AVAILABLE_WIDGETS}<br>
        Available patterns: {PROP_AVAILABLE_PATTERNS}<br>
        <a href="/api/get_config">Config</a>
        """
    
    @app.route("/api/available_fonts")
    def wr_available_fonts():
        return json_response(pygame.font.get_fonts())
    
    @app.route("/api/available_widgets")
    def wr_available_widgets():
        return json_response(PROP_AVAILABLE_WIDGETS)
    
    @app.route("/api/available_patterns")
    def wr_available_patterns():
        return json_response(PROP_AVAILABLE_PATTERNS)
    
    @app.route("/api/set_widget", methods=["POST"])
    def wr_set_widget():
        # Make sure the request is valid
        valid, validation_response = verify_json_request(request, ["widget"])
        if not valid:
            return validation_response
    
        # Verify pattern is valid and available
        widget_name = request.json.get("widget")
        if widget_name is None:
            return json_response({
                "error": "No widget provided"
            }, 400)
    
        if widget_name not in PROP_AVAILABLE_WIDGETS:
            return json_response({
                "error": f"Widget {widget_name} is not available"
            }, 400)
    
        config_override = request.json.get("config", {})
        if type(config_override) is not dict:
            return json_response({
                "error": "Config must be an Object"
            }, 400)
    
        config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
    
        # Send the pattern to the LED thread
        display_queue.put("SET_WIDGET:" + widget_name + ":" + config_b64)
    
        # Return success
        return json_response({
            "status": "OK",
            "pattern": widget_name
        })
    
    @app.route("/api/set_pattern", methods=["POST"])
    def wr_set_pattern():
        # Make sure the request is valid
        valid, validation_response = verify_json_request(request, ["pattern"])
        if not valid:
            return validation_response
    
        # Verify pattern is valid and available
        pattern_name = request.json.get("pattern")
        if pattern_name is None:
            return json_response({
                "error": "No pattern provided"
            }, 400)
    
        if pattern_name not in PROP_AVAILABLE_PATTERNS:
            return json_response({
                "error": f"Pattern {pattern_name} is not available"
            }, 400)
    
        config_override = request.json.get("config", {})
        if type(config_override) is not dict:
            return json_response({
                "error": "Config must be an Object"
            }, 400)
    
        config_b64 = base64.b64encode(json.dumps(config_override).encode("utf-8")).decode("utf-8")
    
        # Send the pattern to the LED thread
        led_queue.put("SET_PATTERN:" + pattern_name + ":" + config_b64)
    
        # Return success
        return json_response({
            "status": "OK",
            "pattern": pattern_name
        })
    
    @app.route("/api/get_config")
    def wr_get_config():
        return json_response(module_config)
    
    
    #####################
    # Loading functions #
    #####################
    
    def load_widget(widget_name: str, config_override: dict=None):
        # Get the widget properties from the config file
        widget_props = module_config.get(widget_name, {}).copy()
    
        if config_override is not None:
            widget_props.update(config_override)
    
        # Scale the properties that are supposed to be scalable
        for k, v in widget_props.copy().items():
            if (isinstance(v, int) or isinstance(v, float)) and k.startswith("scalable_"):
                widget_props[k.removeprefix("scalable_")] = v * PROP_WINDOW_SCALE
    
        # Add the pygame module to the widget properties
        widget_props["pygame"] = pygame
    
        # Load the widget
        return getattr(screenwidgets, widget_name)(widget_props)
    
    def load_pattern(pattern_name: str, leds: ILedString, config_override: dict=None):
        pattern_props = module_config.get(pattern_name, {}).copy()
    
        if config_override is not None:
            pattern_props.update(config_override)
    
        return getattr(ledpatterns, pattern_name)(pattern_props, leds)
    
    def run_screen():
        # Initialise Pygame and create a window
        pygame.init()
    
        pg_options = 0x0
        if PROP_WINDOW_FULLSCREEN:
            pg_options |= pygame.FULLSCREEN
        screen = pygame.display.set_mode(
            (PROP_SCREEN_WIDTH * PROP_WINDOW_SCALE, PROP_SCREEN_HEIGHT * PROP_WINDOW_SCALE),
            pg_options
        )
        pygame.display.set_caption("Clock")
        pygame.mouse.set_visible(not PROP_HIDE_MOUSE)
    
        # Create a clock for FPS counter and timing
        clock = pygame.time.Clock()
    
        # Init widgets
        widget_to_load = PROP_AVAILABLE_WIDGETS[0]  # TODO change to load multiple widgets
    
        current_widget = load_widget(widget_to_load)
    
        # Main loop
        running = True
        while running:
            for event in pygame.event.get():
                if event.type == pygame.QUIT:
                    running = False
    
            # Clear the screen
            screen.fill((0, 0, 0))
    
            # Check for messages
            try:
                message = display_queue.get_nowait()
                parts = message.split(":")
                if parts[0] == "SET_WIDGET":
                    widget_to_load = parts[1]
                    if widget_to_load in PROP_AVAILABLE_WIDGETS: # Check input validity
                        config_override = {}
                        if len(parts) == 3:
                            try:
                                config_override = json.loads(base64.b64decode(parts[2]).decode("utf-8"))
                            except json.JSONDecodeError:
                                pass
                        current_widget = load_widget(widget_to_load, config_override)
            except queue.Empty:
                pass
    
            # Draw stuff here
            current_widget.draw(screen)
    
            # Display FPS counter
            if hw_config["show_fps"]:
                fps = clock.get_fps()
                font = pygame.font.Font(None, 36)
                text = font.render(f"FPS: {fps:.2f}", True, (255, 255, 255))
                screen.blit(text, (10, 10))
    
            # Tick the clock
            if hw_config["fps_max"] > 0:
                clock.tick(hw_config["fps_max"])
            else:
                clock.tick()
    
            # Update the screen
            pygame.display.flip()
    
        # Send exit message to main thread
        mainthread_queue.put("EXIT")
    
        # Quit Pygame
        pygame.quit()
    
    def run_leds():
        # Load LEDs
        if PROP_LEDS_DRIVER == "dummy":
            from absled.dummy import DummyLedString
            leds = DummyLedString("", PROP_LEDS_LENGTH)
        elif PROP_LEDS_DRIVER == "npixel":
            from absled.npixel import NeopixelLedString
            leds = NeopixelLedString(PROP_LEDS_LOCATION, PROP_LEDS_LENGTH)
        else:
            raise ValueError("Invalid LED driver")
    
        current_pattern = load_pattern(PROP_AVAILABLE_PATTERNS[0], leds)
        pattern_timer = module_config.get(PROP_AVAILABLE_PATTERNS[0], {}).get("tick_rate", -1)
        current_pattern.tick()
    
        while True:
            try:
                message = led_queue.get_nowait()
                parts = message.split(":")
                if parts[0] == "SET_PATTERN":
                    pattern_name = parts[1]
                    if pattern_name in PROP_AVAILABLE_PATTERNS:
                        config_override = {}
                        if len(parts) == 3:
                            try:
                                config_override = json.loads(base64.b64decode(parts[2]).decode("utf-8"))
                            except json.JSONDecodeError:
                                pass
                        current_pattern = load_pattern(pattern_name, leds, config_override)
                        pattern_timer = module_config.get(pattern_name, {}).get("tick_rate", -1)
                        current_pattern.tick()
                elif message == "EXIT":
                    break
            except queue.Empty:
                pass
    
            if pattern_timer >= 0:
                time.sleep(pattern_timer / 1000)
                current_pattern.tick()
    
    
    if __name__ == "__main__":
        # Load configuration
        load_module_config()
    
        # Fork off pygame
        pygame_thread = threading.Thread(target=run_screen, daemon=True)
        pygame_thread.start()
    
        # Fork off flask server
        flask_thread = threading.Thread(
            target=lambda: app.run(debug=PROP_FLASK_DEBUG, host=PROP_FLASK_HOST, port=PROP_FLASK_PORT, use_reloader=False),
            daemon=True)
        flask_thread.start()
    
        # Fork off LED thread
        led_thread = threading.Thread(target=run_leds, daemon=True)
        led_thread.start()
    
        # Wait for the threads to finish
        try:
            while True:
                try:
                    message = mainthread_queue.get_nowait()
                    if message == "EXIT":
                        display_queue.put("EXIT")
                        led_queue.put("EXIT")
                        break
                except queue.Empty:
                    pass
                # Keep main thread alive
                time.sleep(1)
        except KeyboardInterrupt:
            exit()
        exit()