129 lines
2.6 KiB
Go
129 lines
2.6 KiB
Go
package websocket
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/gofiber/contrib/websocket"
|
|
)
|
|
|
|
type Client struct {
|
|
Conn *websocket.Conn
|
|
Channels map[string]bool
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (client *Client) SendMessage(message []byte, messageType int) {
|
|
|
|
if messageType == 0 {
|
|
messageType = websocket.BinaryMessage
|
|
}
|
|
|
|
client.mu.Lock()
|
|
defer client.mu.Unlock()
|
|
|
|
err := client.Conn.WriteMessage(messageType, message)
|
|
if err != nil {
|
|
fmt.Println("Error sending message:", err)
|
|
}
|
|
}
|
|
|
|
type LastMessage struct {
|
|
Message []byte
|
|
MessageType int
|
|
}
|
|
|
|
type ChannelManager struct {
|
|
clients map[*Client]bool
|
|
mu sync.RWMutex
|
|
register chan *Client
|
|
unregister chan *Client
|
|
lastMessages map[string]LastMessage
|
|
}
|
|
|
|
func NewChannelManager() *ChannelManager {
|
|
return &ChannelManager{
|
|
clients: make(map[*Client]bool),
|
|
register: make(chan *Client),
|
|
unregister: make(chan *Client),
|
|
lastMessages: make(map[string]LastMessage),
|
|
}
|
|
}
|
|
|
|
func (client *Client) Subscribe(channel string, cm *ChannelManager) {
|
|
client.mu.Lock()
|
|
defer client.mu.Unlock()
|
|
client.Channels[channel] = true
|
|
cm.SendLastMessage(client, channel)
|
|
}
|
|
|
|
func (client *Client) Unsubscribe(channel string) {
|
|
client.mu.Lock()
|
|
defer client.mu.Unlock()
|
|
delete(client.Channels, channel)
|
|
}
|
|
|
|
func (client *Client) IsSubscribed(channel string) bool {
|
|
client.mu.RLock()
|
|
defer client.mu.RUnlock()
|
|
_, ok := client.Channels[channel]
|
|
return ok
|
|
}
|
|
|
|
func (cm *ChannelManager) SendLastMessage(client *Client, channel string) {
|
|
cm.mu.RLock()
|
|
lastMessage, exists := cm.lastMessages[channel]
|
|
if exists {
|
|
go func() {
|
|
client.SendMessage(lastMessage.Message, lastMessage.MessageType)
|
|
}()
|
|
}
|
|
cm.mu.RUnlock()
|
|
}
|
|
|
|
func (cm *ChannelManager) BroadcastTo(channel string, message []byte, messageType int) {
|
|
if bytes.Equal(message, cm.lastMessages[channel].Message) {
|
|
return
|
|
}
|
|
|
|
if messageType == 0 {
|
|
messageType = websocket.BinaryMessage
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
defer cm.mu.Unlock()
|
|
cm.lastMessages[channel] = LastMessage{Message: message, MessageType: messageType}
|
|
|
|
for client := range cm.clients {
|
|
if client.IsSubscribed(channel) {
|
|
client.SendMessage(message, messageType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cm *ChannelManager) Start() {
|
|
for {
|
|
select {
|
|
case client := <-cm.register:
|
|
cm.mu.Lock()
|
|
cm.clients[client] = true
|
|
cm.mu.Unlock()
|
|
case client := <-cm.unregister:
|
|
cm.mu.Lock()
|
|
if _, ok := cm.clients[client]; ok {
|
|
delete(cm.clients, client)
|
|
client.Conn.Close()
|
|
}
|
|
cm.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (cm *ChannelManager) Register(client *Client) {
|
|
cm.register <- client
|
|
}
|
|
|
|
func (cm *ChannelManager) Unregister(client *Client) {
|
|
cm.unregister <- client
|
|
}
|