package main import ( "btclock/broker" _ "btclock/clients" "btclock/handlers" "btclock/modules" ws "btclock/websocket" "context" "fmt" "log" "net/http" "os" "os/signal" "syscall" "time" "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/filesystem" "github.com/vmihailenco/msgpack/v5" ) type Message struct { Type string `msgpack:"type" json:"type"` EventType string `msgpack:"eventType" json:"eventType"` Currencies []string `msgpack:"currencies" json:"currencies"` } type App struct { app *fiber.App eventBroker *broker.EventBroker channelManager *ws.ChannelManager } func NewApp() *App { return &App{ app: fiber.New(), } } func (a *App) initializeModules() error { a.eventBroker = broker.NewEventBroker() m := modules.Registry.GetAllModules() log.Printf("Initializing %d modules", len(m)) for _, module := range m { if err := module.Init(a.eventBroker); err != nil { return fmt.Errorf("failed to initialize module %s: %w", module.ID(), err) } log.Printf("Module initialized: %s", module.ID()) } for _, module := range m { go func(m modules.Module) { if err := m.Start(context.Background()); err != nil { log.Printf("Module %s error: %v", m.ID(), err) } }(module) } return nil } func (a *App) setupWebSocketHandlers() { a.channelManager = ws.NewChannelManager() go a.channelManager.Start() // V2 handler setup v2handler := &handlers.WebSocketV2Handler{Cm: a.channelManager} a.eventBroker.Register("ticker", v2handler) a.eventBroker.Register("mempool-fee-rate", v2handler) a.eventBroker.Register("mempool-block", v2handler) // V1 handler setup v1handler := &handlers.WebSocketV1Handler{Cm: a.channelManager} a.eventBroker.Register("ticker", v1handler) a.eventBroker.Register("mempool-fee-rate", v1handler) a.eventBroker.Register("mempool-block", v1handler) } func (a *App) setupRoutes() { // Static file serving a.app.Use(filesystem.New(filesystem.Config{ Root: http.Dir("./static"), Index: "index.html", Browse: false, })) // WebSocket routes a.app.Get("/api/v1/ws", websocket.New(a.handleWebSocketV1)) a.app.Get("/api/v2/ws", websocket.New(a.handleWebSocket)) } func (a *App) handleWebSocketV1(c *websocket.Conn) { client := &ws.Client{ Conn: c, Channels: make(map[string]bool), } a.channelManager.Register(client) defer a.channelManager.Unregister(client) // Subscribe to default channels client.Subscribe("blockheight-v1", a.channelManager) client.Subscribe("blockfee-v1", a.channelManager) client.Subscribe("price-v1", a.channelManager) for { _, _, err := c.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { log.Printf("WebSocket connection closed: %v", err) } break } } } func (a *App) handleWebSocket(c *websocket.Conn) { client := &ws.Client{ Conn: c, Channels: make(map[string]bool), } a.channelManager.Register(client) defer a.channelManager.Unregister(client) for { messageType, message, err := c.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { log.Printf("WebSocket connection closed: %v", err) } break } if messageType == websocket.BinaryMessage { if err := a.handleBinaryMessage(client, message); err != nil { log.Printf("Error handling message: %v", err) } } } } func (a *App) handleBinaryMessage(client *ws.Client, message []byte) error { var msg Message if err := msgpack.Unmarshal(message, &msg); err != nil { return fmt.Errorf("error unmarshalling message: %w", err) } switch msg.Type { case "subscribe": return a.handleSubscribe(client, msg) case "unsubscribe": return a.handleUnsubscribe(client, msg) default: return fmt.Errorf("unknown message type: %s", msg.Type) } } func (a *App) handleSubscribe(client *ws.Client, msg Message) error { if msg.EventType == "price" { for _, currency := range msg.Currencies { channel := fmt.Sprintf("price:%s", currency) client.Subscribe(channel, a.channelManager) log.Printf("[%s] Subscribed to channel: %s", client.Conn.RemoteAddr().String(), channel) } } else { client.Subscribe(msg.EventType, a.channelManager) log.Printf("[%s] Subscribed to channel: %s", client.Conn.RemoteAddr().String(), msg.EventType) } return nil } func (a *App) handleUnsubscribe(client *ws.Client, msg Message) error { if msg.EventType == "price" { for _, currency := range msg.Currencies { channel := fmt.Sprintf("price:%s", currency) client.Unsubscribe(channel) log.Printf("[%s] Unsubscribed from channel: %s", client.Conn.RemoteAddr().String(), channel) } } else { client.Unsubscribe(msg.EventType) log.Printf("[%s] Unsubscribed from channel: %s", client.Conn.RemoteAddr().String(), msg.EventType) } return nil } func (a *App) start() error { port := os.Getenv("PORT") if port == "" { port = "80" } // Create a channel to listen for OS signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) // Start the server in a goroutine go func() { if err := a.app.Listen(":" + port); err != nil { log.Printf("Server error: %v", err) } }() // Wait for interrupt signal <-sigChan log.Println("Shutting down server...") // Create a timeout context for graceful shutdown ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // Attempt graceful shutdown if err := a.app.ShutdownWithContext(ctx); err != nil { return fmt.Errorf("error during server shutdown: %w", err) } return nil } func main() { app := NewApp() if err := app.initializeModules(); err != nil { log.Fatalf("Failed to initialize modules: %v", err) } app.setupWebSocketHandlers() app.setupRoutes() if err := app.start(); err != nil { log.Fatalf("Server error: %v", err) } }