Initial commit
This commit is contained in:
commit
c5111cb551
20 changed files with 1764 additions and 0 deletions
138
clients/kraken.go
Normal file
138
clients/kraken.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package clients
|
||||
|
||||
import (
|
||||
"btclock/broker"
|
||||
"btclock/models"
|
||||
"btclock/modules"
|
||||
"context"
|
||||
"log"
|
||||
|
||||
"github.com/coder/websocket"
|
||||
"github.com/coder/websocket/wsjson"
|
||||
)
|
||||
|
||||
type KrakenClient struct {
|
||||
ws *websocket.Conn
|
||||
eventTypes []string
|
||||
broker *broker.EventBroker
|
||||
}
|
||||
|
||||
type KrakenTickerEvent struct {
|
||||
Symbol string
|
||||
Last float64
|
||||
}
|
||||
|
||||
// Name implements broker.Event.
|
||||
func (k KrakenTickerEvent) Name() string {
|
||||
return "ticker"
|
||||
}
|
||||
|
||||
// Payload implements broker.Event.
|
||||
func (k KrakenTickerEvent) Payload() interface{} {
|
||||
return map[string]interface{}{
|
||||
"symbol": k.Symbol,
|
||||
"last": k.Last,
|
||||
}
|
||||
}
|
||||
|
||||
func init() {
|
||||
modules.Registry.Register(NewKrakenClient())
|
||||
}
|
||||
|
||||
func NewKrakenClient() *KrakenClient {
|
||||
return &KrakenClient{
|
||||
eventTypes: []string{"ticker"},
|
||||
}
|
||||
}
|
||||
|
||||
func (k *KrakenClient) Init(broker *broker.EventBroker) error {
|
||||
k.broker = broker
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KrakenClient) Start(ctx context.Context) error {
|
||||
ws, _, err := websocket.Dial(ctx, "wss://ws.kraken.com/v2", nil)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer ws.CloseNow()
|
||||
|
||||
err = wsjson.Write(ctx, ws, map[string]interface{}{
|
||||
"method": "subscribe",
|
||||
"params": map[string]interface{}{
|
||||
"channel": "ticker",
|
||||
"symbol": []string{"BTC/USD", "BTC/EUR", "BTC/GBP", "BTC/JPY", "BTC/CHF", "BTC/CAD", "BTC/AUD"},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Expect subscription confirmation
|
||||
var v map[string]interface{}
|
||||
err = wsjson.Read(ctx, ws, &v)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// Read messages until the connection is closed
|
||||
for {
|
||||
var v map[string]any
|
||||
err = wsjson.Read(ctx, ws, &v)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
channel, ok := v["channel"].(string)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if channel == "ticker" {
|
||||
tickerData := handleTickerMessage(v)
|
||||
tickerEvent := KrakenTickerEvent{
|
||||
Symbol: tickerData.Symbol,
|
||||
Last: tickerData.Last,
|
||||
}
|
||||
k.broker.Publish("ticker", tickerEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (k *KrakenClient) Stop() error {
|
||||
k.ws.Close(websocket.StatusNormalClosure, "")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (k *KrakenClient) ID() string {
|
||||
return "kraken"
|
||||
}
|
||||
|
||||
func handleTickerMessage(v map[string]any) models.Ticker {
|
||||
data, ok := v["data"].([]interface{})
|
||||
if !ok {
|
||||
return models.Ticker{}
|
||||
}
|
||||
|
||||
for _, item := range data {
|
||||
if ticker, ok := item.(map[string]interface{}); ok {
|
||||
symbol, symbolOk := ticker["symbol"].(string)
|
||||
last, lastOk := ticker["last"].(float64)
|
||||
bid, bidOk := ticker["bid"].(float64)
|
||||
ask, askOk := ticker["ask"].(float64)
|
||||
|
||||
if symbolOk && lastOk && bidOk && askOk {
|
||||
ticker := models.Ticker{
|
||||
Symbol: symbol,
|
||||
Last: last,
|
||||
Bid: bid,
|
||||
Ask: ask,
|
||||
}
|
||||
// fmt.Printf("%+v\n", ticker)
|
||||
return ticker
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return models.Ticker{}
|
||||
}
|
102
clients/kraken_test.go
Normal file
102
clients/kraken_test.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package clients
|
||||
|
||||
import (
|
||||
"btclock/broker"
|
||||
"btclock/models"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestKrakenClient_HandleTickerMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]any
|
||||
expectedTicker models.Ticker
|
||||
}{
|
||||
{
|
||||
name: "valid ticker message",
|
||||
input: map[string]any{
|
||||
"data": []any{
|
||||
map[string]any{
|
||||
"symbol": "BTC/USD",
|
||||
"last": 50000.0,
|
||||
"bid": 49900.0,
|
||||
"ask": 50100.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTicker: models.Ticker{
|
||||
Symbol: "BTC/USD",
|
||||
Last: 50000.0,
|
||||
Bid: 49900.0,
|
||||
Ask: 50100.0,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid data format",
|
||||
input: map[string]any{
|
||||
"data": "invalid",
|
||||
},
|
||||
expectedTicker: models.Ticker{},
|
||||
},
|
||||
{
|
||||
name: "empty data array",
|
||||
input: map[string]any{
|
||||
"data": []any{},
|
||||
},
|
||||
expectedTicker: models.Ticker{},
|
||||
},
|
||||
{
|
||||
name: "missing required fields",
|
||||
input: map[string]any{
|
||||
"data": []any{
|
||||
map[string]any{
|
||||
"symbol": "BTC/USD",
|
||||
// missing last, bid, ask
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedTicker: models.Ticker{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := handleTickerMessage(tt.input)
|
||||
assert.Equal(t, tt.expectedTicker, result)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestKrakenClient_Init(t *testing.T) {
|
||||
broker := broker.NewEventBroker()
|
||||
client := NewKrakenClient()
|
||||
|
||||
err := client.Init(broker)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, broker, client.broker)
|
||||
}
|
||||
|
||||
func TestKrakenClient_ID(t *testing.T) {
|
||||
client := NewKrakenClient()
|
||||
assert.Equal(t, "kraken", client.ID())
|
||||
}
|
||||
|
||||
func TestKrakenTickerEvent_Name(t *testing.T) {
|
||||
event := KrakenTickerEvent{
|
||||
Symbol: "BTC/USD",
|
||||
Last: 50000.0,
|
||||
}
|
||||
assert.Equal(t, "ticker", event.Name())
|
||||
}
|
||||
|
||||
func TestKrakenTickerEvent_Payload(t *testing.T) {
|
||||
event := KrakenTickerEvent{
|
||||
Symbol: "BTC/USD",
|
||||
Last: 50000.0,
|
||||
}
|
||||
payload := event.Payload().(map[string]interface{})
|
||||
assert.Equal(t, "BTC/USD", payload["symbol"])
|
||||
assert.Equal(t, 50000.0, payload["last"])
|
||||
}
|
202
clients/mempool.go
Normal file
202
clients/mempool.go
Normal file
|
@ -0,0 +1,202 @@
|
|||
package clients
|
||||
|
||||
import (
|
||||
"btclock/broker"
|
||||
"btclock/models"
|
||||
"btclock/modules"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
coderws "github.com/coder/websocket"
|
||||
"github.com/coder/websocket/wsjson"
|
||||
)
|
||||
|
||||
type MempoolClient struct {
|
||||
ws *coderws.Conn
|
||||
broker *broker.EventBroker
|
||||
eventTypes []string
|
||||
}
|
||||
|
||||
type MempoolBlockEvent struct {
|
||||
broker.Event
|
||||
Block models.Block
|
||||
}
|
||||
|
||||
// Name implements broker.Event.
|
||||
func (m MempoolBlockEvent) Name() string {
|
||||
return "mempool-block"
|
||||
}
|
||||
|
||||
// Payload implements broker.Event.
|
||||
func (m MempoolBlockEvent) Payload() interface{} {
|
||||
return m.Block.Height
|
||||
}
|
||||
|
||||
type MempoolFeeRateEvent struct {
|
||||
broker.Event
|
||||
FeeRate float64
|
||||
}
|
||||
|
||||
// Name implements broker.Event.
|
||||
func (m MempoolFeeRateEvent) Name() string {
|
||||
return "mempool-fee-rate"
|
||||
}
|
||||
|
||||
// Payload implements broker.Event.
|
||||
func (m MempoolFeeRateEvent) Payload() interface{} {
|
||||
return m.FeeRate
|
||||
}
|
||||
|
||||
func init() {
|
||||
modules.Registry.Register(NewMempoolClient())
|
||||
}
|
||||
|
||||
func NewMempoolClient() *MempoolClient {
|
||||
return &MempoolClient{
|
||||
eventTypes: []string{"block", "feeRate"},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MempoolClient) Init(broker *broker.EventBroker) error {
|
||||
m.broker = broker
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MempoolClient) Start(ctx context.Context) error {
|
||||
ws, _, err := coderws.Dial(ctx, "wss://mempool.dbtc.link/api/v1/ws", nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to mempool websocket: %w", err)
|
||||
}
|
||||
m.ws = ws
|
||||
|
||||
defer func() {
|
||||
if err := m.ws.Close(coderws.StatusNormalClosure, ""); err != nil {
|
||||
log.Printf("error closing websocket: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
if err := wsjson.Write(ctx, m.ws, createSubscriptionMessage()); err != nil {
|
||||
return fmt.Errorf("failed to send subscription message: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
var v map[string]any
|
||||
if err := wsjson.Read(ctx, m.ws, &v); err != nil {
|
||||
return fmt.Errorf("error reading message: %w", err)
|
||||
}
|
||||
|
||||
for key := range v {
|
||||
switch key {
|
||||
case "mempool-blocks":
|
||||
if err := m.handleMempoolBlocksMessage(v); err != nil {
|
||||
log.Printf("error handling mempool blocks: %v", err)
|
||||
}
|
||||
case "block":
|
||||
if err := m.handleBlocksMessage(v); err != nil {
|
||||
log.Printf("error handling block: %v", err)
|
||||
}
|
||||
case "blocks":
|
||||
if err := m.handleBlocksArrayMessage(v); err != nil {
|
||||
log.Printf("error handling blocks array: %v", err)
|
||||
}
|
||||
}
|
||||
break // Only process the first key
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MempoolClient) Stop() error {
|
||||
if m.ws != nil {
|
||||
return m.ws.Close(coderws.StatusNormalClosure, "")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MempoolClient) ID() string {
|
||||
return "mempool"
|
||||
}
|
||||
|
||||
func createSubscriptionMessage() map[string]any {
|
||||
return map[string]any{
|
||||
"action": "want",
|
||||
"data": []string{"mempool-blocks", "blocks"},
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MempoolClient) handleMempoolBlocksMessage(v map[string]any) error {
|
||||
blocksRaw, ok := v["mempool-blocks"].([]any)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected mempool-blocks format")
|
||||
}
|
||||
for _, block := range blocksRaw {
|
||||
blockMap, ok := block.(map[string]any)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
var mb models.MempoolBlock
|
||||
// manual mapping since we have interface{}
|
||||
if bs, ok := blockMap["blockSize"].(float64); ok {
|
||||
mb.BlockSize = int(bs)
|
||||
}
|
||||
if bvs, ok := blockMap["blockVSize"].(float64); ok {
|
||||
mb.BlockVSize = bvs
|
||||
}
|
||||
if ntx, ok := blockMap["nTx"].(float64); ok {
|
||||
mb.NTx = int(ntx)
|
||||
}
|
||||
if tf, ok := blockMap["totalFees"].(float64); ok {
|
||||
mb.TotalFees = int(tf)
|
||||
}
|
||||
if mf, ok := blockMap["medianFee"].(float64); ok {
|
||||
mb.MedianFee = mf
|
||||
}
|
||||
if fr, ok := blockMap["feeRange"].([]any); ok {
|
||||
mb.FeeRange = make([]float64, len(fr))
|
||||
for i, v := range fr {
|
||||
if f, ok := v.(float64); ok {
|
||||
mb.FeeRange[i] = f
|
||||
}
|
||||
}
|
||||
}
|
||||
m.broker.Publish("mempool-fee-rate", MempoolFeeRateEvent{FeeRate: mb.MedianFee})
|
||||
break
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MempoolClient) handleBlocksMessage(v map[string]any) error {
|
||||
blockRaw, ok := v["block"].(map[string]any)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected blocks format")
|
||||
}
|
||||
var b models.Block
|
||||
if height, ok := blockRaw["height"].(float64); ok {
|
||||
b.Height = int(height)
|
||||
m.broker.Publish("mempool-block", MempoolBlockEvent{Block: b})
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *MempoolClient) handleBlocksArrayMessage(v map[string]any) error {
|
||||
blocks, ok := v["blocks"].([]any)
|
||||
if !ok || len(blocks) == 0 {
|
||||
return fmt.Errorf("unexpected blocks format or empty blocks array")
|
||||
}
|
||||
|
||||
blockRaw, ok := blocks[len(blocks)-1].(map[string]any)
|
||||
if !ok {
|
||||
return fmt.Errorf("unexpected block format in blocks array")
|
||||
}
|
||||
|
||||
if height, ok := blockRaw["height"].(float64); ok {
|
||||
log.Printf("Last block height: %d", int(height))
|
||||
m.broker.Publish("mempool-block", MempoolBlockEvent{Block: models.Block{Height: int(height)}})
|
||||
}
|
||||
return nil
|
||||
}
|
184
clients/mempool_test.go
Normal file
184
clients/mempool_test.go
Normal file
|
@ -0,0 +1,184 @@
|
|||
package clients
|
||||
|
||||
import (
|
||||
"btclock/broker"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMempoolClient_HandleMempoolBlocksMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]any
|
||||
expectedError bool
|
||||
expectedFee float64
|
||||
}{
|
||||
{
|
||||
name: "valid mempool blocks message",
|
||||
input: map[string]any{
|
||||
"mempool-blocks": []any{
|
||||
map[string]any{
|
||||
"blockSize": 1000.0,
|
||||
"blockVSize": 2000.0,
|
||||
"nTx": 100.0,
|
||||
"totalFees": 500.0,
|
||||
"medianFee": 5.5,
|
||||
"feeRange": []any{1.0, 2.0, 3.0},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
expectedFee: 5.5,
|
||||
},
|
||||
{
|
||||
name: "invalid mempool blocks format",
|
||||
input: map[string]any{
|
||||
"mempool-blocks": "invalid",
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "empty mempool blocks",
|
||||
input: map[string]any{
|
||||
"mempool-blocks": []any{},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
broker := broker.NewEventBroker()
|
||||
client := &MempoolClient{
|
||||
broker: broker,
|
||||
}
|
||||
|
||||
err := client.handleMempoolBlocksMessage(tt.input)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolClient_HandleBlocksMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]any
|
||||
expectedError bool
|
||||
expectedHeight int
|
||||
}{
|
||||
{
|
||||
name: "valid block message",
|
||||
input: map[string]any{
|
||||
"block": map[string]any{
|
||||
"height": 800000.0,
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
expectedHeight: 800000,
|
||||
},
|
||||
{
|
||||
name: "invalid block format",
|
||||
input: map[string]any{
|
||||
"block": "invalid",
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "missing height",
|
||||
input: map[string]any{
|
||||
"block": map[string]any{},
|
||||
},
|
||||
expectedError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
broker := broker.NewEventBroker()
|
||||
client := &MempoolClient{
|
||||
broker: broker,
|
||||
}
|
||||
|
||||
err := client.handleBlocksMessage(tt.input)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolClient_HandleBlocksArrayMessage(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]any
|
||||
expectedError bool
|
||||
expectedHeight int
|
||||
}{
|
||||
{
|
||||
name: "valid blocks array message",
|
||||
input: map[string]any{
|
||||
"blocks": []any{
|
||||
map[string]any{
|
||||
"height": 799999.0,
|
||||
},
|
||||
map[string]any{
|
||||
"height": 800000.0,
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedError: false,
|
||||
expectedHeight: 800000,
|
||||
},
|
||||
{
|
||||
name: "invalid blocks format",
|
||||
input: map[string]any{
|
||||
"blocks": "invalid",
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
{
|
||||
name: "empty blocks array",
|
||||
input: map[string]any{
|
||||
"blocks": []any{},
|
||||
},
|
||||
expectedError: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
broker := broker.NewEventBroker()
|
||||
client := &MempoolClient{
|
||||
broker: broker,
|
||||
}
|
||||
|
||||
err := client.handleBlocksArrayMessage(tt.input)
|
||||
if tt.expectedError {
|
||||
assert.Error(t, err)
|
||||
} else {
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMempoolClient_Init(t *testing.T) {
|
||||
broker := broker.NewEventBroker()
|
||||
client := NewMempoolClient()
|
||||
|
||||
err := client.Init(broker)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, broker, client.broker)
|
||||
}
|
||||
|
||||
func TestMempoolClient_ID(t *testing.T) {
|
||||
client := NewMempoolClient()
|
||||
assert.Equal(t, "mempool", client.ID())
|
||||
}
|
35
clients/mock_broker.go
Normal file
35
clients/mock_broker.go
Normal file
|
@ -0,0 +1,35 @@
|
|||
package clients
|
||||
|
||||
import (
|
||||
"btclock/broker"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockEventBroker implements broker.EventBroker for testing
|
||||
type MockEventBroker struct {
|
||||
mock.Mock
|
||||
realBroker *broker.EventBroker
|
||||
}
|
||||
|
||||
func NewMockEventBroker() *broker.EventBroker {
|
||||
mock := &MockEventBroker{
|
||||
realBroker: broker.NewEventBroker(),
|
||||
}
|
||||
return mock.realBroker
|
||||
}
|
||||
|
||||
func (m *MockEventBroker) Register(eventType string, handler broker.EventHandler) {
|
||||
m.Called(eventType, handler)
|
||||
m.realBroker.Register(eventType, handler)
|
||||
}
|
||||
|
||||
func (m *MockEventBroker) Publish(eventType string, event broker.Event) {
|
||||
m.Called(eventType, event)
|
||||
m.realBroker.Publish(eventType, event)
|
||||
}
|
||||
|
||||
func (m *MockEventBroker) Unregister(eventType string, handler broker.EventHandler) {
|
||||
m.Called(eventType, handler)
|
||||
m.realBroker.Unregister(eventType, handler)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue