8000 Message by keito0tada · Pull Request #1 · comb19/chat_back · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Message #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,10 @@
# chat_back
# chat_back
チャットアプリのバックエンド


![Static Badge](https://img.shields.io/badge/Go-00ADD8?logo=Go&logoColor=white&style=for-the-badge)
![Static Badge](https://img.shields.io/badge/postgresql-4169e1?style=for-the-badge&logo=postgresql&logoColor=white)
![Static Badge](https://img.shields.io/badge/-Docker-2496ED?style=for-the-badge&logo=Docker&logoColor=white)
![Static Badge](https://img.shields.io/badge/Docker-Compose-blue?style=for-the-badge&logo=docker&logoColor=white)
![Static Badge](https://img.shields.io/badge/-Clerk-6C47FF?style=for-the-badge&logo=clerk&logoColor=white)
![Static Badge](https://img.shields.io/badge/-OpenAPI%203.0-success?style=for-the-badge)
242 changes: 145 additions & 97 deletions api/interface/handler/message.go
8000
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package handler

import (
"chat_back/usecase"
"encoding/json"
"fmt"
"net/http"

"encoding/json"
"time"

"github.com/clerk/clerk-sdk-go/v2"

"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"gorm.io/gorm"
Expand All @@ -18,6 +19,18 @@ const (
Send = "send"
)

const (
writeWait = 10 * time.Second
pingPeriod = 1 * time.Second
pongWait = 60 * time.Second
maxMessageSize = 4096
)

var (
newline = []byte{'\n'}
space = []byte{' '}
)

type sentMessage struct {
Action string `json:"action"`
ChannelID string `json:"channel_id"`
Expand Down Expand Up @@ -57,58 +70,139 @@ type messageHandler struct {
}

type Client struct {
hub *Hub
conn *websocket.Conn
userID string
channelID string
// send chan []byte
send chan message
}

type Hub struct {
clients *map[string]map[*Client]struct{}
clients map[string]map[*Client]struct{}
// clients map[*Client]struct{}
register chan *Client
unregister chan *Client
broadcast chan message
// broadcast chan []byte
broadcast chan message
}

func (h *Hub) run() {
for {
select {
case client := <-h.register:
if _, ok := (*h.clients)[client.channelID]; !ok {
(*h.clients)[client.channelID] = make(map[*Client]struct{})
fmt.Println("register")
if _, ok := h.clients[client.channelID]; !ok {
h.clients[client.channelID] = make(map[*Client]struct{})
}
(*h.clients)[client.channelID][client] = struct{}{}
h.clients[client.channelID][client] = struct{}{}

case client := <-h.unregister:
if _, ok := (*h.clients)[client.channelID]; ok {
if _, ok := (*h.clients)[client.channelID][client]; ok {
client.conn.Close()
delete((*h.clients)[client.channelID], client)
if len((*h.clients)[client.channelID]) == 0 {
delete(*h.clients, client.channelID)
}
fmt.Println("unregister")
if _, ok := h.clients[client.channelID][client]; ok {
delete(h.clients[client.channelID], client)
close(client.send)
fmt.Printf("Websocket in %s closed", client.channelID)
if len(h.clients[client.channelID]) <= 0 {
delete(h.clients, client.channelID)
fmt.Printf("All websocket in %s closed", client.channelID)
}
}

case msg := <-h.broadcast:
parsedMessage, err := json.Marshal(msg)
if err != nil {
fmt.Println("Error marshalling message:", err)
continue
fmt.Println("broadcast")
fmt.Println("msg", msg)
for client := range h.clients[msg.ChannelID] {
select {
case client.send <- msg:
fmt.Println("msg sent to ", client.channelID, client.userID)
default:
close(client.send)
delete(h.clients[client.channelID], client)
}
}
for client, _ := range (*h.clients)[msg.ChannelID] {
fmt.Println("broadcast:", msg.ChannelID)
}

if client.channelID != msg.ChannelID {
fmt.Println("channel id mismatch:", client.channelID)
}
}
}

err = client.conn.WriteMessage(websocket.TextMessage, parsedMessage)
if err != nil {
fmt.Println("Error writing message:", err)
h.unregister <- client
}
func (c *Client) readPump(db *gorm.DB, uc usecase.MessageUsecase, user *clerk.User) {
fmt.Println("read pump")
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()

c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error {
c.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})

for {
fmt.Println("reading")
_, rawMsg, err := c.conn.ReadMessage()
fmt.Println(string(rawMsg), err)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
fmt.Println("unexpected close error:", err)
}
break
}

var msg message
if err := json.Unmarshal(rawMsg, &msg); err != nil {
fmt.Println("fail to unmarshal:", rawMsg)
break
}
fmt.Println("read", msg)

insertedMsg, err := uc.Insert(db, msg.ChannelID, user.ID, msg.Content)
if err != nil {
fmt.Println("message insertion error", err)
break
}

msg.ID = insertedMsg.ID
msg.UserName = *user.Username

c.hub.broadcast <- msg
}
}

func (c *Client) writePump() {
fmt.Println("write pump")

ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()

for {
fmt.Println("writing", c.channelID)
select {
case msg, ok := <-c.send:
fmt.Println("get msg <- send")
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}

parsed_msg, err := json.Marshal(msg)
if err != nil {
fmt.Println("fail to marshal:", msg)
}

c.conn.WriteMessage(websocket.TextMessage, parsed_msg)

case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
Expand All @@ -117,15 +211,17 @@ func NewMessageHandler(db *gorm.DB, messageUseCase usecase.MessageUsecase, autho
fmt.Println("NewMessageHandler")

wsUpgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
ReadBufferSize: 4096,
WriteBufferSize: 4096,
CheckOrigin: func(r *http.Request) bool { return true },
}
hub := Hub{
clients: &map[string]map[*Client]struct{}{},
clients: map[string]map[*Client]struct{}{},
// clients: make(map[*Client]struct{}),
register: make(chan *Client),
unregister: make(chan *Client),
broadcast: make(chan message),
// broadcast: make(chan []byte),
}
go hub.run()

Expand All @@ -138,53 +234,6 @@ func NewMessageHandler(db *gorm.DB, messageUseCase usecase.MessageUsecase, autho
}
}

func waitForMessage(uc usecase.MessageUsecase, db *gorm.DB, user *clerk.User, messageURI messageURI, conn *websocket.Conn, broadcast chan message) {
fmt.Println("waitForMessage")

for {
msgType, msg, err := conn.ReadMessage()
fmt.Println("read a message", msgType, string(msg))
if err != nil {
fmt.Println("Error reading message:", err)
break
}
if msgType != websocket.TextMessage {
fmt.Println("Message type mismatch:", msgType)
}

var sentMessage sentMessage
if err := json.Unmarshal(msg, &sentMessage); err != nil {
fmt.Println("Error unmarshalling message:", err)
break
}
fmt.Println("receiveMessage", sentMessage)

if sentMessage.Action != Send {
fmt.Println("Action mismatch:", sentMessage.Action)
break
}
if sentMessage.ChannelID != messageURI.ChannelID {
fmt.Println("Channel ID mismatch")
break
}

fmt.Println("broadcast")
insertedMessage, err := uc.Insert(db, sentMessage.ChannelID, user.ID, sentMessage.Content)
if err != nil {
fmt.Println(err)
continue
}

broadcast <- message{
ID: insertedMessage.ID,
UserID: insertedMessage.UserID,
UserName: insertedMessage.UserName,
ChannelID: insertedMessage.ChannelID,
Content: insertedMessage.Content,
}
}
}

func (mh messageHandler) HandleMessageWebSocket(ctx *gin.Context) {
fmt.Println("HandleMessageWebSocket")

< 5D32 a href="#diff-cfccd1c47bfd0da1112259914791a69e0de87f43907205145ece356b03d14790" id="expand-link-276-diff-cfccd1c47bfd0da1112259914791a69e0de87f43907205145ece356b03d14790" class="js-expand directional-expander single-expander" aria-label="Expand All" data-url="/comb19/chat_back/blob_excerpt/c5edb71c5f71442c802c8c6757972d535bc3f45e?context=pull_request&diff=unified&in_wiki_context&last_left=190&last_right=239&left=197&left_hunk_size=54&mode=100644&path=api%2Finterface%2Fhandler%2Fmessage.go&pull_request_id=2509779656&right=246&right_hunk_size=53" data-left-range="191-192" data-right-range="240-241"> Expand All @@ -197,54 +246,53 @@ func (mh messageHandler) HandleMessageWebSocket(ctx *gin.Context) {

conn, err := mh.wsUpgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
fmt.Println(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": "failed to upgrade connection"})
fmt.Println("websocket upgrade err:", err)
return
}

_, msg, err := conn.ReadMessage()
client := &Client{hub: mh.hub, conn: conn, channelID: messageURI.ChannelID, send: make(chan message, 256)}
client.hub.register <- client

msgType, msg, err := conn.ReadMessage()
if err != nil {
fmt.Println(err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": "failed to read message"})
fmt.Println("authorization msg", err)
conn.Close()
return
}
if msgType != websocket.TextMessage {
fmt.Println("authorization msg", msgType)
conn.Close()
return
}

var authorizationMessage authorizationMessage
if err := json.Unmarshal(msg, &authorizationMessage); err != nil {
fmt.Println("Error unmarshalling message:", err)
ctx.JSON(http.StatusBadRequest, gin.H{"error": "invalid message format"})
conn.Close()
return
}
fmt.Println("authorizationMessage", authorizationMessage)
if authorizationMessage.ChannelID != messageURI.ChannelID {
fmt.Println("Channel ID mismatch")
ctx.JSON(http.StatusBadRequest, gin.H{"error": "channel ID mismatch"})
conn.Close()
return
}

fmt.Println("hi")
fmt.Println(authorizationMessage.ChannelID)
fmt.Println("hi")

user, err := mh.authorizationUseCase.CheckPermission(mh.db, authorizationMessage.ChannelID, authorizationMessage.Token)
if err != nil {
fmt.Println("Error checking permission:", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"error": "failed to check permission"})
conn.Close()
return
}
if user == nil {
fmt.Println("User not found or no permission")
ctx.JSON(http.StatusForbidden, gin.H{"error": "no permission"})
conn.Close()
return
}

mh.hub.register <- &Client{
conn: conn,
userID: user.ID,
channelID: messageURI.ChannelID,
}
go client.writePump()
go client.readPump(mh.db, mh.messageUseCase, user)

go waitForMessage(mh.messageUseCase, mh.db, user, messageURI, conn, mh.hub.broadcast)
}

func (mh messageHandler) HandleMessageByID(ctx *gin.Context) {
Expand Down
0