package clients import ( "btclock/broker" "btclock/models" "btclock/modules" "context" "fmt" "log" coderws "github.com/coder/websocket" "github.com/coder/websocket/wsjson" ) type MempoolClient struct { ws *coderws.Conn broker *broker.EventBroker eventTypes []string } type MempoolBlockEvent struct { broker.Event Block models.Block } // Name implements broker.Event. func (m MempoolBlockEvent) Name() string { return "mempool-block" } // Payload implements broker.Event. func (m MempoolBlockEvent) Payload() interface{} { return m.Block.Height } type MempoolFeeRateEvent struct { broker.Event FeeRate float64 } // Name implements broker.Event. func (m MempoolFeeRateEvent) Name() string { return "mempool-fee-rate" } // Payload implements broker.Event. func (m MempoolFeeRateEvent) Payload() interface{} { return m.FeeRate } func init() { modules.Registry.Register(NewMempoolClient()) } func NewMempoolClient() *MempoolClient { return &MempoolClient{ eventTypes: []string{"block", "feeRate"}, } } func (m *MempoolClient) Init(broker *broker.EventBroker) error { m.broker = broker return nil } func (m *MempoolClient) Start(ctx context.Context) error { ws, _, err := coderws.Dial(ctx, "wss://mempool.dbtc.link/api/v1/ws", nil) if err != nil { return fmt.Errorf("failed to connect to mempool websocket: %w", err) } m.ws = ws defer func() { if err := m.ws.Close(coderws.StatusNormalClosure, ""); err != nil { log.Printf("error closing websocket: %v", err) } }() if err := wsjson.Write(ctx, m.ws, createSubscriptionMessage()); err != nil { return fmt.Errorf("failed to send subscription message: %w", err) } for { select { case <-ctx.Done(): return ctx.Err() default: var v map[string]any if err := wsjson.Read(ctx, m.ws, &v); err != nil { return fmt.Errorf("error reading message: %w", err) } for key := range v { switch key { case "mempool-blocks": if err := m.handleMempoolBlocksMessage(v); err != nil { log.Printf("error handling mempool blocks: %v", err) } case "block": if err := m.handleBlocksMessage(v); err != nil { log.Printf("error handling block: %v", err) } case "blocks": if err := m.handleBlocksArrayMessage(v); err != nil { log.Printf("error handling blocks array: %v", err) } } break // Only process the first key } } } } func (m *MempoolClient) Stop() error { if m.ws != nil { return m.ws.Close(coderws.StatusNormalClosure, "") } return nil } func (m *MempoolClient) ID() string { return "mempool" } func createSubscriptionMessage() map[string]any { return map[string]any{ "action": "want", "data": []string{"mempool-blocks", "blocks"}, } } func (m *MempoolClient) handleMempoolBlocksMessage(v map[string]any) error { blocksRaw, ok := v["mempool-blocks"].([]any) if !ok { return fmt.Errorf("unexpected mempool-blocks format") } for _, block := range blocksRaw { blockMap, ok := block.(map[string]any) if !ok { continue } var mb models.MempoolBlock // manual mapping since we have interface{} if bs, ok := blockMap["blockSize"].(float64); ok { mb.BlockSize = int(bs) } if bvs, ok := blockMap["blockVSize"].(float64); ok { mb.BlockVSize = bvs } if ntx, ok := blockMap["nTx"].(float64); ok { mb.NTx = int(ntx) } if tf, ok := blockMap["totalFees"].(float64); ok { mb.TotalFees = int(tf) } if mf, ok := blockMap["medianFee"].(float64); ok { mb.MedianFee = mf } if fr, ok := blockMap["feeRange"].([]any); ok { mb.FeeRange = make([]float64, len(fr)) for i, v := range fr { if f, ok := v.(float64); ok { mb.FeeRange[i] = f } } } m.broker.Publish("mempool-fee-rate", MempoolFeeRateEvent{FeeRate: mb.MedianFee}) break } return nil } func (m *MempoolClient) handleBlocksMessage(v map[string]any) error { blockRaw, ok := v["block"].(map[string]any) if !ok { return fmt.Errorf("unexpected blocks format") } var b models.Block if height, ok := blockRaw["height"].(float64); ok { b.Height = int(height) m.broker.Publish("mempool-block", MempoolBlockEvent{Block: b}) } return nil } func (m *MempoolClient) handleBlocksArrayMessage(v map[string]any) error { blocks, ok := v["blocks"].([]any) if !ok || len(blocks) == 0 { return fmt.Errorf("unexpected blocks format or empty blocks array") } blockRaw, ok := blocks[len(blocks)-1].(map[string]any) if !ok { return fmt.Errorf("unexpected block format in blocks array") } if height, ok := blockRaw["height"].(float64); ok { log.Printf("Last block height: %d", int(height)) m.broker.Publish("mempool-block", MempoolBlockEvent{Block: models.Block{Height: int(height)}}) } return nil }