websocketを使ってサーバーとの相互通信をしたい。
gin-gonic/gin と gorilla/websocket を使って動作確認してみます。
WebSocketの基本
従来のクライアント起点の通信から、サーバ・クライアント間の双方向通信のために作られた仕組み。
http:
、https:
のかわりにws:
とwss:
というURIスキームを用いる。
GOパッケージ / WebSocket
公式の拡張パッケージがありますが、他のパッケージにある機能が欠けてるとのことなのでgorilla/websocketを使用します。
githubを眺めるとgobwas/wsやolahol/melodyなんかも気になるところ。
特にmelodyはjavascriptっぽいイベントハンドリングが出来そうに見える。
サンプル
基本的なクライアント・サーバー処理は examples/echo がわかりやすいです。
外部だとBitFlyerの仮想通貨APIがgorillaでjson-rpc使用のサンプルがありました。
BitFlyerのサンプルはそのままだと -32600 Invalid Request となるので送受信JSONを分けて使うように変更が必要です。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
type JsonRPC2Send struct { Version string `json:"jsonrpc"` Method string `json:"method"` Params interface{} `json:"params"` Id *int `json:"id"` } type JsonRPC2Rcv struct { Version string `json:"jsonrpc"` Method string `json:"method"` Params interface{} `json:"params"` Result interface{} `json:"result"` Error interface{} `json:"error"` Id *int `json:"id"` } |
クライアント処理
コードはexamples/echoを見るとして基本的に3ステップです。
- コネクションを張る
- 受信用のルーチンを作る
- (定期的に何か送ったりしつつ)待機する
ReadMessage/ReadJSON
で受信処理し、WriteMessage/WriteJSON
で送信処理します。
サーバ処理
こっちはもっとシンプルに2ステップ。
-
Upgrader
をセットアップ - 受信を待ったり(クライアント主動)、タイマーなんかのイベント(サーバ主動)を待って送信する
サンプルコード
1ファイルでサーバーとクライアントを同時に立ち上げるサンプルです。
サンプルつぎはぎですがクライアント側はOSシグナル処理をすると終了時にサーバのみ生き残ってしまうのでコメントアウトしています。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
package main import ( "log" "net/http" "net/url" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" ) /*--- Server ---*/ var wsupgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, } func wshandler(w http.ResponseWriter, r *http.Request) { conn, err := wsupgrader.Upgrade(w, r, nil) if err != nil { log.Println("Failed to set websocket upgrade: %+v", err) return } ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case t := <-ticker.C: conn.WriteMessage(websocket.TextMessage, []byte(t.String())) } } //何か受け取ってそのまま返すパターン /* for { t, msg, err := conn.ReadMessage() if err != nil { break } conn.WriteMessage(t, msg) } */ } func main() { r := gin.Default() r.GET("/ws", func(c *gin.Context) { wshandler(c.Writer, c.Request) }) go clientStart() r.Run(":8181") } /* --- Client --- */ func clientStart() { u := url.URL{Scheme: "ws", Host: "localhost:8181", Path: "/ws"} log.Printf("connecting to %s", u.String()) c, _, err := websocket.DefaultDialer.Dial(u.String(), nil) if err != nil { log.Println("dial:", err) } defer c.Close() done := make(chan struct{}) go receive(c, done) waitloop(c, done) } func receive(c *websocket.Conn, done chan struct{}) { defer close(done) for { _, message, err := c.ReadMessage() if err != nil { log.Println("read:", err) return } log.Printf("recv: %s", message) } } func waitloop(c *websocket.Conn, done chan struct{}) { //停止用 //interrupt := make(chan os.Signal, 1) //signal.Notify(interrupt, os.Interrupt) for { select { case <-done: log.Println("done") return /* case <-interrupt: log.Println("interrupt") err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Println("write close:", err) return } select { case <-done: case <-time.After(time.Second): } return */ } } } |
クライアント側処理で時間をrecv:
していれば成功です。
このコードだとgo clientStart()
を増やすとそれぞれにタイマーが動きます。
ちゃんとブロードキャスト(一斉送信)したい場合にはコネクションを保存しておく必要があります。ちょうどその管理をしているのがexamples/chatのHubです。
今回のサンプルで簡潔に書くとこんな感じ。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
var conArr = make([]*websocket.Conn, 0) ... func wshandler(w http.ResponseWriter, r *http.Request) { conn, err := wsupgrader.Upgrade(w, r, nil) if err != nil { log.Println("Failed to set websocket upgrade: %+v", err) return } conArr = append(conArr, conn) } ... main(){ ... go clientStart() go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { case t := <-ticker.C: for _, conn := range conArr { conn.WriteMessage(websocket.TextMessage, []byte(t.String())) } } } }() ... } |
これでサーバーイベントに沿ってクライアントに一斉送信します。
メモリ管理
サーバのUpgrader
でバッファの管理をしています。試しにこんな感じで実行。
1 2 3 4 |
var wsupgrader = websocket.Upgrader{ ReadBufferSize: 1024*1024*1024, WriteBufferSize: 1024*1024*1024, } |
タスクマネージャでメモリを見たところサンプル通りの1クライアント接続で70MB、3クライアントのルーチンにしてみたら200MBになった。
ReadBufferSize and WriteBufferSize specify I/O buffer sizes in bytes
らしいので(1GB+1GB) * クライアント数
で6Gかと思ったけどそうでもない。
ただバッファサイズとクライアント数に比例して必要メモリ量が増えます。
なのでクローズ管理はちゃんとやらないといけない。