202 lines
4.6 KiB
Go
202 lines
4.6 KiB
Go
package clients
|
|
|
|
import (
|
|
"btclock/broker"
|
|
"btclock/models"
|
|
"btclock/modules"
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
|
|
coderws "github.com/coder/websocket"
|
|
"github.com/coder/websocket/wsjson"
|
|
)
|
|
|
|
type MempoolClient struct {
|
|
ws *coderws.Conn
|
|
broker *broker.EventBroker
|
|
eventTypes []string
|
|
}
|
|
|
|
type MempoolBlockEvent struct {
|
|
broker.Event
|
|
Block models.Block
|
|
}
|
|
|
|
// Name implements broker.Event.
|
|
func (m MempoolBlockEvent) Name() string {
|
|
return "mempool-block"
|
|
}
|
|
|
|
// Payload implements broker.Event.
|
|
func (m MempoolBlockEvent) Payload() interface{} {
|
|
return m.Block.Height
|
|
}
|
|
|
|
type MempoolFeeRateEvent struct {
|
|
broker.Event
|
|
FeeRate float64
|
|
}
|
|
|
|
// Name implements broker.Event.
|
|
func (m MempoolFeeRateEvent) Name() string {
|
|
return "mempool-fee-rate"
|
|
}
|
|
|
|
// Payload implements broker.Event.
|
|
func (m MempoolFeeRateEvent) Payload() interface{} {
|
|
return m.FeeRate
|
|
}
|
|
|
|
func init() {
|
|
modules.Registry.Register(NewMempoolClient())
|
|
}
|
|
|
|
func NewMempoolClient() *MempoolClient {
|
|
return &MempoolClient{
|
|
eventTypes: []string{"block", "feeRate"},
|
|
}
|
|
}
|
|
|
|
func (m *MempoolClient) Init(broker *broker.EventBroker) error {
|
|
m.broker = broker
|
|
return nil
|
|
}
|
|
|
|
func (m *MempoolClient) Start(ctx context.Context) error {
|
|
ws, _, err := coderws.Dial(ctx, "wss://mempool.dbtc.link/api/v1/ws", nil)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to mempool websocket: %w", err)
|
|
}
|
|
m.ws = ws
|
|
|
|
defer func() {
|
|
if err := m.ws.Close(coderws.StatusNormalClosure, ""); err != nil {
|
|
log.Printf("error closing websocket: %v", err)
|
|
}
|
|
}()
|
|
|
|
if err := wsjson.Write(ctx, m.ws, createSubscriptionMessage()); err != nil {
|
|
return fmt.Errorf("failed to send subscription message: %w", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
var v map[string]any
|
|
if err := wsjson.Read(ctx, m.ws, &v); err != nil {
|
|
return fmt.Errorf("error reading message: %w", err)
|
|
}
|
|
|
|
for key := range v {
|
|
switch key {
|
|
case "mempool-blocks":
|
|
if err := m.handleMempoolBlocksMessage(v); err != nil {
|
|
log.Printf("error handling mempool blocks: %v", err)
|
|
}
|
|
case "block":
|
|
if err := m.handleBlocksMessage(v); err != nil {
|
|
log.Printf("error handling block: %v", err)
|
|
}
|
|
case "blocks":
|
|
if err := m.handleBlocksArrayMessage(v); err != nil {
|
|
log.Printf("error handling blocks array: %v", err)
|
|
}
|
|
}
|
|
break // Only process the first key
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *MempoolClient) Stop() error {
|
|
if m.ws != nil {
|
|
return m.ws.Close(coderws.StatusNormalClosure, "")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *MempoolClient) ID() string {
|
|
return "mempool"
|
|
}
|
|
|
|
func createSubscriptionMessage() map[string]any {
|
|
return map[string]any{
|
|
"action": "want",
|
|
"data": []string{"mempool-blocks", "blocks"},
|
|
}
|
|
}
|
|
|
|
func (m *MempoolClient) handleMempoolBlocksMessage(v map[string]any) error {
|
|
blocksRaw, ok := v["mempool-blocks"].([]any)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected mempool-blocks format")
|
|
}
|
|
for _, block := range blocksRaw {
|
|
blockMap, ok := block.(map[string]any)
|
|
if !ok {
|
|
continue
|
|
}
|
|
var mb models.MempoolBlock
|
|
// manual mapping since we have interface{}
|
|
if bs, ok := blockMap["blockSize"].(float64); ok {
|
|
mb.BlockSize = int(bs)
|
|
}
|
|
if bvs, ok := blockMap["blockVSize"].(float64); ok {
|
|
mb.BlockVSize = bvs
|
|
}
|
|
if ntx, ok := blockMap["nTx"].(float64); ok {
|
|
mb.NTx = int(ntx)
|
|
}
|
|
if tf, ok := blockMap["totalFees"].(float64); ok {
|
|
mb.TotalFees = int(tf)
|
|
}
|
|
if mf, ok := blockMap["medianFee"].(float64); ok {
|
|
mb.MedianFee = mf
|
|
}
|
|
if fr, ok := blockMap["feeRange"].([]any); ok {
|
|
mb.FeeRange = make([]float64, len(fr))
|
|
for i, v := range fr {
|
|
if f, ok := v.(float64); ok {
|
|
mb.FeeRange[i] = f
|
|
}
|
|
}
|
|
}
|
|
m.broker.Publish("mempool-fee-rate", MempoolFeeRateEvent{FeeRate: mb.MedianFee})
|
|
break
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *MempoolClient) handleBlocksMessage(v map[string]any) error {
|
|
blockRaw, ok := v["block"].(map[string]any)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected blocks format")
|
|
}
|
|
var b models.Block
|
|
if height, ok := blockRaw["height"].(float64); ok {
|
|
b.Height = int(height)
|
|
m.broker.Publish("mempool-block", MempoolBlockEvent{Block: b})
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *MempoolClient) handleBlocksArrayMessage(v map[string]any) error {
|
|
blocks, ok := v["blocks"].([]any)
|
|
if !ok || len(blocks) == 0 {
|
|
return fmt.Errorf("unexpected blocks format or empty blocks array")
|
|
}
|
|
|
|
blockRaw, ok := blocks[len(blocks)-1].(map[string]any)
|
|
if !ok {
|
|
return fmt.Errorf("unexpected block format in blocks array")
|
|
}
|
|
|
|
if height, ok := blockRaw["height"].(float64); ok {
|
|
log.Printf("Last block height: %d", int(height))
|
|
m.broker.Publish("mempool-block", MempoolBlockEvent{Block: models.Block{Height: int(height)}})
|
|
}
|
|
return nil
|
|
}
|