From c5111cb551dce693a2f72e7f88946028b1c034d6 Mon Sep 17 00:00:00 2001 From: Djuri Baars Date: Sun, 18 May 2025 23:54:04 +0200 Subject: [PATCH] Initial commit --- .env | 1 + Dockerfile | 30 +++++ README.md | 124 +++++++++++++++++++++ broker/broker.go | 53 +++++++++ clients/kraken.go | 138 +++++++++++++++++++++++ clients/kraken_test.go | 102 +++++++++++++++++ clients/mempool.go | 202 ++++++++++++++++++++++++++++++++++ clients/mempool_test.go | 184 +++++++++++++++++++++++++++++++ clients/mock_broker.go | 35 ++++++ docker-compose.yaml | 14 +++ go.mod | 37 +++++++ go.sum | 96 ++++++++++++++++ handlers/v1.go | 54 +++++++++ handlers/v2.go | 50 +++++++++ main.go | 235 ++++++++++++++++++++++++++++++++++++++++ models/mempool.go | 74 +++++++++++++ models/ticker.go | 8 ++ modules/main.go | 39 +++++++ static/index.html | 159 +++++++++++++++++++++++++++ websocket/main.go | 129 ++++++++++++++++++++++ 20 files changed, 1764 insertions(+) create mode 100644 .env create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 broker/broker.go create mode 100644 clients/kraken.go create mode 100644 clients/kraken_test.go create mode 100644 clients/mempool.go create mode 100644 clients/mempool_test.go create mode 100644 clients/mock_broker.go create mode 100644 docker-compose.yaml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 handlers/v1.go create mode 100644 handlers/v2.go create mode 100644 main.go create mode 100644 models/mempool.go create mode 100644 models/ticker.go create mode 100644 modules/main.go create mode 100644 static/index.html create mode 100644 websocket/main.go diff --git a/.env b/.env new file mode 100644 index 0000000..c0c68b1 --- /dev/null +++ b/.env @@ -0,0 +1 @@ +PORT=3000 \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1ed6af6 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,30 @@ +# Build stage +FROM golang:1.24.3-alpine AS builder + +WORKDIR /app + +# Copy go mod and sum files +COPY go.mod ./ + +# Download dependencies +RUN go mod download + +# Copy source code +COPY . . + +# Build the application +RUN CGO_ENABLED=0 GOOS=linux go build -o main . + +# Final stage +FROM alpine:latest + +WORKDIR /app + +# Copy the binary from builder +COPY --from=builder /app/main . + +# Expose the port +EXPOSE ${PORT} + +# Run the application +CMD ["./main"] \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..9e977cf --- /dev/null +++ b/README.md @@ -0,0 +1,124 @@ +# BTClock Go Server + +A high-performance data service for the BTClock, providing real-time Bitcoin data through WebSocket connections. + +## Features + +- Real-time Bitcoin price updates +- Mempool fee rate monitoring +- Block height tracking +- WebSocket API (v1 and v2) +- Docker support +- Graceful shutdown handling +- Modular architecture + +## Prerequisites + +- Go 1.21 or later +- Docker and Docker Compose (optional) + +## Installation + +### Local Development + +1. Clone the repository: +```bash +git clone https://github.com/yourusername/btclock-go-server.git +cd btclock-go-server +``` + +2. Install dependencies: +```bash +go mod download +``` + +3. Run the server: +```bash +go run main.go +``` + +### Docker Deployment + +1. Build and run using Docker Compose: +```bash +docker-compose up --build +``` + +## Configuration + +The server can be configured using environment variables: + +| Variable | Description | Default | +|----------|-------------|---------| +| PORT | Server port | 80 | + +## API Documentation + +### WebSocket API v2 + +Connect to `ws://your-server/api/v2/ws` for the latest API version. + +#### Subscribe to Events + +Send a binary message with the following structure: + +```json +{ + "type": "subscribe", + "eventType": "price", + "currencies": ["USD", "EUR"] +} +``` + +Available event types: +- `price`: Bitcoin price updates +- `mempool-fee-rate`: Mempool fee rate updates +- `mempool-block`: New block notifications + +#### Unsubscribe from Events + +```json +{ + "type": "unsubscribe", + "eventType": "price", + "currencies": ["USD", "EUR"] +} +``` + +### WebSocket API v1 + +Connect to `ws://your-server/api/v1/ws` for the legacy API version. + +Automatically subscribes to: +- `blockheight-v1`: Block height updates +- `blockfee-v1`: Fee rate updates +- `price-v1`: Price updates + +## Project Structure + +``` +. +├── broker/ # Event broker implementation +├── clients/ # External service clients +├── handlers/ # WebSocket request handlers +├── models/ # Data models +├── modules/ # Service modules +├── static/ # Static web assets +├── websocket/ # WebSocket utilities +├── main.go # Application entry point +├── Dockerfile # Docker configuration +└── docker-compose.yaml +``` + +## Development + +### Testing + +Run the test suite: +```bash +go test ./... +``` + +## License + +This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details. \ No newline at end of file diff --git a/broker/broker.go b/broker/broker.go new file mode 100644 index 0000000..4003db7 --- /dev/null +++ b/broker/broker.go @@ -0,0 +1,53 @@ +package broker + +import ( + "context" + "sync" +) + +type EventBroker struct { + handlers map[string][]EventHandler + mu sync.RWMutex +} + +type EventHandler interface { + Handle(ctx context.Context, event Event) error +} + +type Event interface { + Name() string + Payload() interface{} +} + +func NewEventBroker() *EventBroker { + return &EventBroker{ + handlers: make(map[string][]EventHandler), + } +} + +func (b *EventBroker) Register(eventType string, handler EventHandler) { + b.mu.Lock() + defer b.mu.Unlock() + b.handlers[eventType] = append(b.handlers[eventType], handler) +} + +func (b *EventBroker) Publish(eventType string, event Event) { + b.mu.RLock() + defer b.mu.RUnlock() + for _, handler := range b.handlers[eventType] { + handler.Handle(context.Background(), event) + } +} + +func (b *EventBroker) Unregister(eventType string, handler EventHandler) { + b.mu.Lock() + defer b.mu.Unlock() + handlers := b.handlers[eventType] + for i, h := range handlers { + if h == handler { + // Remove the handler from the slice + b.handlers[eventType] = append(handlers[:i], handlers[i+1:]...) + break + } + } +} diff --git a/clients/kraken.go b/clients/kraken.go new file mode 100644 index 0000000..d729882 --- /dev/null +++ b/clients/kraken.go @@ -0,0 +1,138 @@ +package clients + +import ( + "btclock/broker" + "btclock/models" + "btclock/modules" + "context" + "log" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" +) + +type KrakenClient struct { + ws *websocket.Conn + eventTypes []string + broker *broker.EventBroker +} + +type KrakenTickerEvent struct { + Symbol string + Last float64 +} + +// Name implements broker.Event. +func (k KrakenTickerEvent) Name() string { + return "ticker" +} + +// Payload implements broker.Event. +func (k KrakenTickerEvent) Payload() interface{} { + return map[string]interface{}{ + "symbol": k.Symbol, + "last": k.Last, + } +} + +func init() { + modules.Registry.Register(NewKrakenClient()) +} + +func NewKrakenClient() *KrakenClient { + return &KrakenClient{ + eventTypes: []string{"ticker"}, + } +} + +func (k *KrakenClient) Init(broker *broker.EventBroker) error { + k.broker = broker + + return nil +} + +func (k *KrakenClient) Start(ctx context.Context) error { + ws, _, err := websocket.Dial(ctx, "wss://ws.kraken.com/v2", nil) + if err != nil { + log.Fatal(err) + } + defer ws.CloseNow() + + err = wsjson.Write(ctx, ws, map[string]interface{}{ + "method": "subscribe", + "params": map[string]interface{}{ + "channel": "ticker", + "symbol": []string{"BTC/USD", "BTC/EUR", "BTC/GBP", "BTC/JPY", "BTC/CHF", "BTC/CAD", "BTC/AUD"}, + }, + }) + if err != nil { + log.Fatal(err) + } + + // Expect subscription confirmation + var v map[string]interface{} + err = wsjson.Read(ctx, ws, &v) + if err != nil { + log.Fatal(err) + } + + // Read messages until the connection is closed + for { + var v map[string]any + err = wsjson.Read(ctx, ws, &v) + if err != nil { + log.Fatal(err) + } + channel, ok := v["channel"].(string) + if !ok { + continue + } + if channel == "ticker" { + tickerData := handleTickerMessage(v) + tickerEvent := KrakenTickerEvent{ + Symbol: tickerData.Symbol, + Last: tickerData.Last, + } + k.broker.Publish("ticker", tickerEvent) + } + } +} + +func (k *KrakenClient) Stop() error { + k.ws.Close(websocket.StatusNormalClosure, "") + + return nil +} + +func (k *KrakenClient) ID() string { + return "kraken" +} + +func handleTickerMessage(v map[string]any) models.Ticker { + data, ok := v["data"].([]interface{}) + if !ok { + return models.Ticker{} + } + + for _, item := range data { + if ticker, ok := item.(map[string]interface{}); ok { + symbol, symbolOk := ticker["symbol"].(string) + last, lastOk := ticker["last"].(float64) + bid, bidOk := ticker["bid"].(float64) + ask, askOk := ticker["ask"].(float64) + + if symbolOk && lastOk && bidOk && askOk { + ticker := models.Ticker{ + Symbol: symbol, + Last: last, + Bid: bid, + Ask: ask, + } + // fmt.Printf("%+v\n", ticker) + return ticker + } + } + } + + return models.Ticker{} +} diff --git a/clients/kraken_test.go b/clients/kraken_test.go new file mode 100644 index 0000000..eba127e --- /dev/null +++ b/clients/kraken_test.go @@ -0,0 +1,102 @@ +package clients + +import ( + "btclock/broker" + "btclock/models" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestKrakenClient_HandleTickerMessage(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedTicker models.Ticker + }{ + { + name: "valid ticker message", + input: map[string]any{ + "data": []any{ + map[string]any{ + "symbol": "BTC/USD", + "last": 50000.0, + "bid": 49900.0, + "ask": 50100.0, + }, + }, + }, + expectedTicker: models.Ticker{ + Symbol: "BTC/USD", + Last: 50000.0, + Bid: 49900.0, + Ask: 50100.0, + }, + }, + { + name: "invalid data format", + input: map[string]any{ + "data": "invalid", + }, + expectedTicker: models.Ticker{}, + }, + { + name: "empty data array", + input: map[string]any{ + "data": []any{}, + }, + expectedTicker: models.Ticker{}, + }, + { + name: "missing required fields", + input: map[string]any{ + "data": []any{ + map[string]any{ + "symbol": "BTC/USD", + // missing last, bid, ask + }, + }, + }, + expectedTicker: models.Ticker{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := handleTickerMessage(tt.input) + assert.Equal(t, tt.expectedTicker, result) + }) + } +} + +func TestKrakenClient_Init(t *testing.T) { + broker := broker.NewEventBroker() + client := NewKrakenClient() + + err := client.Init(broker) + assert.NoError(t, err) + assert.Equal(t, broker, client.broker) +} + +func TestKrakenClient_ID(t *testing.T) { + client := NewKrakenClient() + assert.Equal(t, "kraken", client.ID()) +} + +func TestKrakenTickerEvent_Name(t *testing.T) { + event := KrakenTickerEvent{ + Symbol: "BTC/USD", + Last: 50000.0, + } + assert.Equal(t, "ticker", event.Name()) +} + +func TestKrakenTickerEvent_Payload(t *testing.T) { + event := KrakenTickerEvent{ + Symbol: "BTC/USD", + Last: 50000.0, + } + payload := event.Payload().(map[string]interface{}) + assert.Equal(t, "BTC/USD", payload["symbol"]) + assert.Equal(t, 50000.0, payload["last"]) +} diff --git a/clients/mempool.go b/clients/mempool.go new file mode 100644 index 0000000..31c800e --- /dev/null +++ b/clients/mempool.go @@ -0,0 +1,202 @@ +package clients + +import ( + "btclock/broker" + "btclock/models" + "btclock/modules" + "context" + "fmt" + "log" + + coderws "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" +) + +type MempoolClient struct { + ws *coderws.Conn + broker *broker.EventBroker + eventTypes []string +} + +type MempoolBlockEvent struct { + broker.Event + Block models.Block +} + +// Name implements broker.Event. +func (m MempoolBlockEvent) Name() string { + return "mempool-block" +} + +// Payload implements broker.Event. +func (m MempoolBlockEvent) Payload() interface{} { + return m.Block.Height +} + +type MempoolFeeRateEvent struct { + broker.Event + FeeRate float64 +} + +// Name implements broker.Event. +func (m MempoolFeeRateEvent) Name() string { + return "mempool-fee-rate" +} + +// Payload implements broker.Event. +func (m MempoolFeeRateEvent) Payload() interface{} { + return m.FeeRate +} + +func init() { + modules.Registry.Register(NewMempoolClient()) +} + +func NewMempoolClient() *MempoolClient { + return &MempoolClient{ + eventTypes: []string{"block", "feeRate"}, + } +} + +func (m *MempoolClient) Init(broker *broker.EventBroker) error { + m.broker = broker + return nil +} + +func (m *MempoolClient) Start(ctx context.Context) error { + ws, _, err := coderws.Dial(ctx, "wss://mempool.dbtc.link/api/v1/ws", nil) + if err != nil { + return fmt.Errorf("failed to connect to mempool websocket: %w", err) + } + m.ws = ws + + defer func() { + if err := m.ws.Close(coderws.StatusNormalClosure, ""); err != nil { + log.Printf("error closing websocket: %v", err) + } + }() + + if err := wsjson.Write(ctx, m.ws, createSubscriptionMessage()); err != nil { + return fmt.Errorf("failed to send subscription message: %w", err) + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + var v map[string]any + if err := wsjson.Read(ctx, m.ws, &v); err != nil { + return fmt.Errorf("error reading message: %w", err) + } + + for key := range v { + switch key { + case "mempool-blocks": + if err := m.handleMempoolBlocksMessage(v); err != nil { + log.Printf("error handling mempool blocks: %v", err) + } + case "block": + if err := m.handleBlocksMessage(v); err != nil { + log.Printf("error handling block: %v", err) + } + case "blocks": + if err := m.handleBlocksArrayMessage(v); err != nil { + log.Printf("error handling blocks array: %v", err) + } + } + break // Only process the first key + } + } + } +} + +func (m *MempoolClient) Stop() error { + if m.ws != nil { + return m.ws.Close(coderws.StatusNormalClosure, "") + } + return nil +} + +func (m *MempoolClient) ID() string { + return "mempool" +} + +func createSubscriptionMessage() map[string]any { + return map[string]any{ + "action": "want", + "data": []string{"mempool-blocks", "blocks"}, + } +} + +func (m *MempoolClient) handleMempoolBlocksMessage(v map[string]any) error { + blocksRaw, ok := v["mempool-blocks"].([]any) + if !ok { + return fmt.Errorf("unexpected mempool-blocks format") + } + for _, block := range blocksRaw { + blockMap, ok := block.(map[string]any) + if !ok { + continue + } + var mb models.MempoolBlock + // manual mapping since we have interface{} + if bs, ok := blockMap["blockSize"].(float64); ok { + mb.BlockSize = int(bs) + } + if bvs, ok := blockMap["blockVSize"].(float64); ok { + mb.BlockVSize = bvs + } + if ntx, ok := blockMap["nTx"].(float64); ok { + mb.NTx = int(ntx) + } + if tf, ok := blockMap["totalFees"].(float64); ok { + mb.TotalFees = int(tf) + } + if mf, ok := blockMap["medianFee"].(float64); ok { + mb.MedianFee = mf + } + if fr, ok := blockMap["feeRange"].([]any); ok { + mb.FeeRange = make([]float64, len(fr)) + for i, v := range fr { + if f, ok := v.(float64); ok { + mb.FeeRange[i] = f + } + } + } + m.broker.Publish("mempool-fee-rate", MempoolFeeRateEvent{FeeRate: mb.MedianFee}) + break + } + return nil +} + +func (m *MempoolClient) handleBlocksMessage(v map[string]any) error { + blockRaw, ok := v["block"].(map[string]any) + if !ok { + return fmt.Errorf("unexpected blocks format") + } + var b models.Block + if height, ok := blockRaw["height"].(float64); ok { + b.Height = int(height) + m.broker.Publish("mempool-block", MempoolBlockEvent{Block: b}) + } + return nil +} + +func (m *MempoolClient) handleBlocksArrayMessage(v map[string]any) error { + blocks, ok := v["blocks"].([]any) + if !ok || len(blocks) == 0 { + return fmt.Errorf("unexpected blocks format or empty blocks array") + } + + blockRaw, ok := blocks[len(blocks)-1].(map[string]any) + if !ok { + return fmt.Errorf("unexpected block format in blocks array") + } + + if height, ok := blockRaw["height"].(float64); ok { + log.Printf("Last block height: %d", int(height)) + m.broker.Publish("mempool-block", MempoolBlockEvent{Block: models.Block{Height: int(height)}}) + } + return nil +} diff --git a/clients/mempool_test.go b/clients/mempool_test.go new file mode 100644 index 0000000..ce58c01 --- /dev/null +++ b/clients/mempool_test.go @@ -0,0 +1,184 @@ +package clients + +import ( + "btclock/broker" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMempoolClient_HandleMempoolBlocksMessage(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedError bool + expectedFee float64 + }{ + { + name: "valid mempool blocks message", + input: map[string]any{ + "mempool-blocks": []any{ + map[string]any{ + "blockSize": 1000.0, + "blockVSize": 2000.0, + "nTx": 100.0, + "totalFees": 500.0, + "medianFee": 5.5, + "feeRange": []any{1.0, 2.0, 3.0}, + }, + }, + }, + expectedError: false, + expectedFee: 5.5, + }, + { + name: "invalid mempool blocks format", + input: map[string]any{ + "mempool-blocks": "invalid", + }, + expectedError: true, + }, + { + name: "empty mempool blocks", + input: map[string]any{ + "mempool-blocks": []any{}, + }, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker := broker.NewEventBroker() + client := &MempoolClient{ + broker: broker, + } + + err := client.handleMempoolBlocksMessage(tt.input) + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMempoolClient_HandleBlocksMessage(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedError bool + expectedHeight int + }{ + { + name: "valid block message", + input: map[string]any{ + "block": map[string]any{ + "height": 800000.0, + }, + }, + expectedError: false, + expectedHeight: 800000, + }, + { + name: "invalid block format", + input: map[string]any{ + "block": "invalid", + }, + expectedError: true, + }, + { + name: "missing height", + input: map[string]any{ + "block": map[string]any{}, + }, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker := broker.NewEventBroker() + client := &MempoolClient{ + broker: broker, + } + + err := client.handleBlocksMessage(tt.input) + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMempoolClient_HandleBlocksArrayMessage(t *testing.T) { + tests := []struct { + name string + input map[string]any + expectedError bool + expectedHeight int + }{ + { + name: "valid blocks array message", + input: map[string]any{ + "blocks": []any{ + map[string]any{ + "height": 799999.0, + }, + map[string]any{ + "height": 800000.0, + }, + }, + }, + expectedError: false, + expectedHeight: 800000, + }, + { + name: "invalid blocks format", + input: map[string]any{ + "blocks": "invalid", + }, + expectedError: true, + }, + { + name: "empty blocks array", + input: map[string]any{ + "blocks": []any{}, + }, + expectedError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + broker := broker.NewEventBroker() + client := &MempoolClient{ + broker: broker, + } + + err := client.handleBlocksArrayMessage(tt.input) + if tt.expectedError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestMempoolClient_Init(t *testing.T) { + broker := broker.NewEventBroker() + client := NewMempoolClient() + + err := client.Init(broker) + assert.NoError(t, err) + assert.Equal(t, broker, client.broker) +} + +func TestMempoolClient_ID(t *testing.T) { + client := NewMempoolClient() + assert.Equal(t, "mempool", client.ID()) +} diff --git a/clients/mock_broker.go b/clients/mock_broker.go new file mode 100644 index 0000000..a6ca245 --- /dev/null +++ b/clients/mock_broker.go @@ -0,0 +1,35 @@ +package clients + +import ( + "btclock/broker" + + "github.com/stretchr/testify/mock" +) + +// MockEventBroker implements broker.EventBroker for testing +type MockEventBroker struct { + mock.Mock + realBroker *broker.EventBroker +} + +func NewMockEventBroker() *broker.EventBroker { + mock := &MockEventBroker{ + realBroker: broker.NewEventBroker(), + } + return mock.realBroker +} + +func (m *MockEventBroker) Register(eventType string, handler broker.EventHandler) { + m.Called(eventType, handler) + m.realBroker.Register(eventType, handler) +} + +func (m *MockEventBroker) Publish(eventType string, event broker.Event) { + m.Called(eventType, event) + m.realBroker.Publish(eventType, event) +} + +func (m *MockEventBroker) Unregister(eventType string, handler broker.EventHandler) { + m.Called(eventType, handler) + m.realBroker.Unregister(eventType, handler) +} diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..c3e9249 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,14 @@ +version: '3.8' + +services: + app: + build: + context: . + dockerfile: Dockerfile + ports: + - "${PORT}:${PORT}" + environment: + - PORT=${PORT} + volumes: + - ./static:/app/static + restart: unless-stopped \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a12ff24 --- /dev/null +++ b/go.mod @@ -0,0 +1,37 @@ +module btclock + +go 1.24.3 + +require ( + github.com/coder/websocket v1.8.13 + github.com/gofiber/contrib/websocket v1.3.4 + github.com/gofiber/fiber/v2 v2.52.6 + github.com/stretchr/testify v1.10.0 + github.com/vmihailenco/msgpack v4.0.4+incompatible + github.com/vmihailenco/msgpack/v5 v5.4.1 +) + +require ( + github.com/andybalholm/brotli v1.1.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/fasthttp/websocket v1.5.12 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/klauspost/compress v1.18.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/mattn/go-runewidth v0.0.16 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/rivo/uniseg v0.4.7 // indirect + github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287 // indirect + github.com/stretchr/objx v0.5.2 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.62.0 // indirect + github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sys v0.33.0 // indirect + google.golang.org/appengine v1.6.8 // indirect + google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..5c72acc --- /dev/null +++ b/go.sum @@ -0,0 +1,96 @@ +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE= +github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE= +github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= +github.com/gofiber/contrib/websocket v1.3.4 h1:tWeBdbJ8q0WFQXariLN4dBIbGH9KBU75s0s7YXplOSg= +github.com/gofiber/contrib/websocket v1.3.4/go.mod h1:kTFBPC6YENCnKfKx0BoOFjgXxdz7E85/STdkmZPEmPs= +github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI= +github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= +github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= +github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287 h1:qIQ0tWF9vxGtkJa24bR+2i53WBCz1nW/Pc47oVYauC4= +github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= +github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= +github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.62.0 h1:8dKRBX/y2rCzyc6903Zu1+3qN0H/d2MsxPPmVNamiH0= +github.com/valyala/fasthttp v1.62.0/go.mod h1:FCINgr4GKdKqV8Q0xv8b+UxPV+H/O5nNFo3D+r54Htg= +github.com/vmihailenco/msgpack v4.0.4+incompatible h1:dSLoQfGFAo3F6OoNhwUmLwVgaUXK79GlxNBwueZn0xI= +github.com/vmihailenco/msgpack v4.0.4+incompatible/go.mod h1:fy3FlTQTDXWkZ7Bh6AcGMlsjHatGryHQYUTf1ShIgkk= +github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IUPn0Bjt8= +github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok= +github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g= +github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= +golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.6.8 h1:IhEN5q69dyKagZPYMSdIjS2HqprW324FRQZJcGqPAsM= +google.golang.org/appengine v1.6.8/go.mod h1:1jJ3jBArFh5pcgW8gCtRJnepW8FzD1V44FJffLiz/Ds= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handlers/v1.go b/handlers/v1.go new file mode 100644 index 0000000..27e5638 --- /dev/null +++ b/handlers/v1.go @@ -0,0 +1,54 @@ +package handlers + +import ( + "btclock/broker" + ws "btclock/websocket" + "context" + "encoding/json" + "fmt" + "math" + + "github.com/gofiber/contrib/websocket" +) + +// WebSocketHandler implements broker.EventHandler for a websocket connection +type WebSocketV1Handler struct { + Cm *ws.ChannelManager +} + +func (h *WebSocketV1Handler) Handle(ctx context.Context, event broker.Event) error { + channel := event.Name() + payload := map[string]interface{}{} + + switch event.Name() { + case "ticker": + payloadMap := event.Payload().(map[string]interface{}) + if payloadMap["symbol"] != "BTC/USD" { + return fmt.Errorf("unknown symbol: %s", payloadMap["symbol"]) + } + payload["bitcoin"] = int(math.Round(payloadMap["last"].(float64))) + channel = "price-v1" + case "mempool-fee-rate": + payload["mempool-blocks"] = []map[string]interface{}{ + { + "medianFee": int(math.Round(event.Payload().(float64))), + }, + } + channel = "blockfee-v1" + case "mempool-block": + payload["block"] = map[string]interface{}{ + "height": event.Payload(), + } + channel = "blockheight-v1" + default: + return fmt.Errorf("unknown event type: %s", event.Name()) + } + + b, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("error marshalling payload: %v", err) + } + + h.Cm.BroadcastTo(channel, b, websocket.TextMessage) + return nil +} diff --git a/handlers/v2.go b/handlers/v2.go new file mode 100644 index 0000000..9bc4a85 --- /dev/null +++ b/handlers/v2.go @@ -0,0 +1,50 @@ +package handlers + +import ( + "btclock/broker" + ws "btclock/websocket" + "context" + "fmt" + "math" + "strings" + + "github.com/gofiber/contrib/websocket" + "github.com/vmihailenco/msgpack" +) + +// WebSocketHandler implements broker.EventHandler for a websocket connection +type WebSocketV2Handler struct { + Cm *ws.ChannelManager +} + +func (h *WebSocketV2Handler) Handle(ctx context.Context, event broker.Event) error { + channel := event.Name() + payload := map[string]interface{}{} + + switch event.Name() { + case "ticker": + payloadMap := event.Payload().(map[string]interface{}) + + symbol := strings.TrimPrefix(event.Payload().(map[string]interface{})["symbol"].(string), "BTC/") + payload["price"] = map[string]interface{}{ + symbol: int(math.Round(payloadMap["last"].(float64))), + } + channel = "price:" + symbol + case "mempool-fee-rate": + payload["blockfee"] = int(math.Round(event.Payload().(float64))) + channel = "blockfee" + case "mempool-block": + payload["blockheight"] = event.Payload() + channel = "blockheight" + default: + return fmt.Errorf("unknown event type: %s", event.Name()) + } + + b, err := msgpack.Marshal(payload) + if err != nil { + return fmt.Errorf("error marshalling payload: %v", err) + } + + h.Cm.BroadcastTo(channel, b, websocket.BinaryMessage) + return nil +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..bf93d36 --- /dev/null +++ b/main.go @@ -0,0 +1,235 @@ +package main + +import ( + "btclock/broker" + _ "btclock/clients" + "btclock/handlers" + "btclock/modules" + ws "btclock/websocket" + "context" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/gofiber/contrib/websocket" + "github.com/gofiber/fiber/v2" + "github.com/gofiber/fiber/v2/middleware/filesystem" + "github.com/vmihailenco/msgpack/v5" +) + +type Message struct { + Type string `msgpack:"type" json:"type"` + EventType string `msgpack:"eventType" json:"eventType"` + Currencies []string `msgpack:"currencies" json:"currencies"` +} + +type App struct { + app *fiber.App + eventBroker *broker.EventBroker + channelManager *ws.ChannelManager +} + +func NewApp() *App { + return &App{ + app: fiber.New(), + } +} + +func (a *App) initializeModules() error { + a.eventBroker = broker.NewEventBroker() + m := modules.Registry.GetAllModules() + + log.Printf("Initializing %d modules", len(m)) + + for _, module := range m { + if err := module.Init(a.eventBroker); err != nil { + return fmt.Errorf("failed to initialize module %s: %w", module.ID(), err) + } + log.Printf("Module initialized: %s", module.ID()) + } + + for _, module := range m { + go func(m modules.Module) { + if err := m.Start(context.Background()); err != nil { + log.Printf("Module %s error: %v", m.ID(), err) + } + }(module) + } + + return nil +} + +func (a *App) setupWebSocketHandlers() { + a.channelManager = ws.NewChannelManager() + go a.channelManager.Start() + + // V2 handler setup + v2handler := &handlers.WebSocketV2Handler{Cm: a.channelManager} + a.eventBroker.Register("ticker", v2handler) + a.eventBroker.Register("mempool-fee-rate", v2handler) + a.eventBroker.Register("mempool-block", v2handler) + + // V1 handler setup + v1handler := &handlers.WebSocketV1Handler{Cm: a.channelManager} + a.eventBroker.Register("ticker", v1handler) + a.eventBroker.Register("mempool-fee-rate", v1handler) + a.eventBroker.Register("mempool-block", v1handler) +} + +func (a *App) setupRoutes() { + // Static file serving + a.app.Use(filesystem.New(filesystem.Config{ + Root: http.Dir("./static"), + Index: "index.html", + Browse: false, + })) + + // WebSocket routes + a.app.Get("/api/v1/ws", websocket.New(a.handleWebSocketV1)) + a.app.Get("/api/v2/ws", websocket.New(a.handleWebSocket)) +} + +func (a *App) handleWebSocketV1(c *websocket.Conn) { + client := &ws.Client{ + Conn: c, + Channels: make(map[string]bool), + } + a.channelManager.Register(client) + defer a.channelManager.Unregister(client) + + // Subscribe to default channels + client.Subscribe("blockheight-v1", a.channelManager) + client.Subscribe("blockfee-v1", a.channelManager) + client.Subscribe("price-v1", a.channelManager) + + for { + _, _, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + log.Printf("WebSocket connection closed: %v", err) + } + break + } + } +} + +func (a *App) handleWebSocket(c *websocket.Conn) { + client := &ws.Client{ + Conn: c, + Channels: make(map[string]bool), + } + a.channelManager.Register(client) + defer a.channelManager.Unregister(client) + + for { + messageType, message, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) { + log.Printf("WebSocket connection closed: %v", err) + } + break + } + + if messageType == websocket.BinaryMessage { + if err := a.handleBinaryMessage(client, message); err != nil { + log.Printf("Error handling message: %v", err) + } + } + } +} + +func (a *App) handleBinaryMessage(client *ws.Client, message []byte) error { + var msg Message + if err := msgpack.Unmarshal(message, &msg); err != nil { + return fmt.Errorf("error unmarshalling message: %w", err) + } + + switch msg.Type { + case "subscribe": + return a.handleSubscribe(client, msg) + case "unsubscribe": + return a.handleUnsubscribe(client, msg) + default: + return fmt.Errorf("unknown message type: %s", msg.Type) + } +} + +func (a *App) handleSubscribe(client *ws.Client, msg Message) error { + if msg.EventType == "price" { + for _, currency := range msg.Currencies { + channel := fmt.Sprintf("price:%s", currency) + client.Subscribe(channel, a.channelManager) + log.Printf("[%s] Subscribed to channel: %s", client.Conn.RemoteAddr().String(), channel) + } + } else { + client.Subscribe(msg.EventType, a.channelManager) + log.Printf("[%s] Subscribed to channel: %s", client.Conn.RemoteAddr().String(), msg.EventType) + } + return nil +} + +func (a *App) handleUnsubscribe(client *ws.Client, msg Message) error { + if msg.EventType == "price" { + for _, currency := range msg.Currencies { + channel := fmt.Sprintf("price:%s", currency) + client.Unsubscribe(channel) + log.Printf("[%s] Unsubscribed from channel: %s", client.Conn.RemoteAddr().String(), channel) + } + } else { + client.Unsubscribe(msg.EventType) + log.Printf("[%s] Unsubscribed from channel: %s", client.Conn.RemoteAddr().String(), msg.EventType) + } + return nil +} + +func (a *App) start() error { + port := os.Getenv("PORT") + if port == "" { + port = "80" + } + + // Create a channel to listen for OS signals + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Start the server in a goroutine + go func() { + if err := a.app.Listen(":" + port); err != nil { + log.Printf("Server error: %v", err) + } + }() + + // Wait for interrupt signal + <-sigChan + log.Println("Shutting down server...") + + // Create a timeout context for graceful shutdown + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Attempt graceful shutdown + if err := a.app.ShutdownWithContext(ctx); err != nil { + return fmt.Errorf("error during server shutdown: %w", err) + } + + return nil +} + +func main() { + app := NewApp() + + if err := app.initializeModules(); err != nil { + log.Fatalf("Failed to initialize modules: %v", err) + } + + app.setupWebSocketHandlers() + app.setupRoutes() + + if err := app.start(); err != nil { + log.Fatalf("Server error: %v", err) + } +} diff --git a/models/mempool.go b/models/mempool.go new file mode 100644 index 0000000..4dc1f9c --- /dev/null +++ b/models/mempool.go @@ -0,0 +1,74 @@ +package models + +// MempoolBlock represents a block in the mempool-blocks message +type MempoolBlock struct { + BlockSize int `json:"blockSize"` + BlockVSize float64 `json:"blockVSize"` + NTx int `json:"nTx"` + TotalFees int `json:"totalFees"` + MedianFee float64 `json:"medianFee"` + FeeRange []float64 `json:"feeRange"` +} + +// Block represents a block in the blocks message +type Block struct { + ID string `json:"id"` + Height int `json:"height"` + Version int `json:"version"` + Timestamp int64 `json:"timestamp"` + Bits int `json:"bits"` + Nonce uint32 `json:"nonce"` + Difficulty float64 `json:"difficulty"` + MerkleRoot string `json:"merkle_root"` + TxCount int `json:"tx_count"` + Size int `json:"size"` + Weight int `json:"weight"` + PreviousBlockHash string `json:"previousblockhash"` + MedianTime int64 `json:"mediantime"` + Stale bool `json:"stale"` + Extras BlockExtras `json:"extras"` +} + +type BlockExtras struct { + Reward int `json:"reward"` + CoinbaseRaw string `json:"coinbaseRaw"` + Orphans []string `json:"orphans"` + MedianFee float64 `json:"medianFee"` + FeeRange []float64 `json:"feeRange"` + TotalFees int `json:"totalFees"` + AvgFee int `json:"avgFee"` + AvgFeeRate int `json:"avgFeeRate"` + UtxoSetChange int `json:"utxoSetChange"` + AvgTxSize float64 `json:"avgTxSize"` + TotalInputs int `json:"totalInputs"` + TotalOutputs int `json:"totalOutputs"` + TotalOutputAmt int64 `json:"totalOutputAmt"` + SegwitTotalTxs int `json:"segwitTotalTxs"` + SegwitTotalSize int `json:"segwitTotalSize"` + SegwitTotalWeight int `json:"segwitTotalWeight"` + VirtualSize float64 `json:"virtualSize"` + CoinbaseAddress string `json:"coinbaseAddress"` + CoinbaseAddresses []string `json:"coinbaseAddresses"` + CoinbaseSignature string `json:"coinbaseSignature"` + CoinbaseSignatureAscii string `json:"coinbaseSignatureAscii"` + Header string `json:"header"` + UtxoSetSize *int `json:"utxoSetSize"` + TotalInputAmt *int64 `json:"totalInputAmt"` + Pool PoolInfo `json:"pool"` + MatchRate *float64 `json:"matchRate"` + ExpectedFees *int `json:"expectedFees"` + ExpectedWeight *int `json:"expectedWeight"` + Similarity float64 `json:"similarity"` +} + +type PoolInfo struct { + ID int `json:"id"` + Name string `json:"name"` + Slug string `json:"slug"` + MinerNames []string `json:"minerNames"` +} + +type MempoolFee struct { + FeeRate float64 `json:"fee_rate"` + Fee float64 `json:"fee"` +} diff --git a/models/ticker.go b/models/ticker.go new file mode 100644 index 0000000..7e9b3e3 --- /dev/null +++ b/models/ticker.go @@ -0,0 +1,8 @@ +package models + +type Ticker struct { + Symbol string `json:"symbol"` + Last float64 `json:"last"` + Bid float64 `json:"bid"` + Ask float64 `json:"ask"` +} diff --git a/modules/main.go b/modules/main.go new file mode 100644 index 0000000..b836740 --- /dev/null +++ b/modules/main.go @@ -0,0 +1,39 @@ +package modules + +import ( + "btclock/broker" + "context" + "sync" +) + +type Module interface { + Init(broker *broker.EventBroker) error + Start(ctx context.Context) error + Stop() error + ID() string +} + +type ModuleRegistry struct { + modules map[string]Module + mu sync.RWMutex +} + +var Registry = &ModuleRegistry{ + modules: make(map[string]Module), +} + +func (b *ModuleRegistry) Register(module Module) { + b.mu.Lock() + defer b.mu.Unlock() + b.modules[module.ID()] = module +} + +func (b *ModuleRegistry) GetAllModules() []Module { + b.mu.RLock() + defer b.mu.RUnlock() + modules := make([]Module, 0, len(b.modules)) + for _, module := range b.modules { + modules = append(modules, module) + } + return modules +} diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..827979b --- /dev/null +++ b/static/index.html @@ -0,0 +1,159 @@ + + + + + + WebSocket Debug Console + + + + + + + + + + + +
+
+ +
+

WebSocket Debug Console

+
+
+ Connecting... +
+
+ + +
+
+
+

WebSocket Messages

+
+ +
+
+
+
+
+
+ + + + + diff --git a/websocket/main.go b/websocket/main.go new file mode 100644 index 0000000..e1961be --- /dev/null +++ b/websocket/main.go @@ -0,0 +1,129 @@ +package websocket + +import ( + "bytes" + "fmt" + "sync" + + "github.com/gofiber/contrib/websocket" +) + +type Client struct { + Conn *websocket.Conn + Channels map[string]bool + mu sync.RWMutex +} + +func (client *Client) SendMessage(message []byte, messageType int) { + + if messageType == 0 { + messageType = websocket.BinaryMessage + } + + client.mu.Lock() + defer client.mu.Unlock() + + err := client.Conn.WriteMessage(messageType, message) + if err != nil { + fmt.Println("Error sending message:", err) + } +} + +type LastMessage struct { + Message []byte + MessageType int +} + +type ChannelManager struct { + clients map[*Client]bool + mu sync.RWMutex + register chan *Client + unregister chan *Client + lastMessages map[string]LastMessage +} + +func NewChannelManager() *ChannelManager { + return &ChannelManager{ + clients: make(map[*Client]bool), + register: make(chan *Client), + unregister: make(chan *Client), + lastMessages: make(map[string]LastMessage), + } +} + +func (client *Client) Subscribe(channel string, cm *ChannelManager) { + client.mu.Lock() + defer client.mu.Unlock() + client.Channels[channel] = true + cm.SendLastMessage(client, channel) +} + +func (client *Client) Unsubscribe(channel string) { + client.mu.Lock() + defer client.mu.Unlock() + delete(client.Channels, channel) +} + +func (client *Client) IsSubscribed(channel string) bool { + client.mu.RLock() + defer client.mu.RUnlock() + _, ok := client.Channels[channel] + return ok +} + +func (cm *ChannelManager) SendLastMessage(client *Client, channel string) { + cm.mu.RLock() + lastMessage, exists := cm.lastMessages[channel] + if exists { + go func() { + client.SendMessage(lastMessage.Message, lastMessage.MessageType) + }() + } + cm.mu.RUnlock() +} + +func (cm *ChannelManager) BroadcastTo(channel string, message []byte, messageType int) { + if bytes.Equal(message, cm.lastMessages[channel].Message) { + return + } + + if messageType == 0 { + messageType = websocket.BinaryMessage + } + + cm.mu.Lock() + defer cm.mu.Unlock() + cm.lastMessages[channel] = LastMessage{Message: message, MessageType: messageType} + + for client := range cm.clients { + if client.IsSubscribed(channel) { + client.SendMessage(message, messageType) + } + } +} + +func (cm *ChannelManager) Start() { + for { + select { + case client := <-cm.register: + cm.mu.Lock() + cm.clients[client] = true + cm.mu.Unlock() + case client := <-cm.unregister: + cm.mu.Lock() + if _, ok := cm.clients[client]; ok { + delete(cm.clients, client) + client.Conn.Close() + } + cm.mu.Unlock() + } + } +} + +func (cm *ChannelManager) Register(client *Client) { + cm.register <- client +} + +func (cm *ChannelManager) Unregister(client *Client) { + cm.unregister <- client +}