binance.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. package binance
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "golang.org/x/net/websocket"
  8. )
  9. const (
  10. XRPBTC = "xrpbtc"
  11. )
  12. type Options struct {
  13. Symbol string
  14. LookbackLimit int
  15. }
  16. func NewBinanceStream(opts Options) BinanceStream {
  17. return &binanceStream{
  18. Options: opts,
  19. LastUpdateID: -1,
  20. DepthEventChannel: make(chan DepthEvent),
  21. ErrorChannel: make(chan error),
  22. }
  23. }
  24. type binanceStream struct {
  25. Options Options
  26. LastUpdateID int64
  27. DepthEventChannel chan DepthEvent
  28. ErrorChannel chan error
  29. }
  30. type BinanceStream interface {
  31. Start()
  32. GetDepthEventChannel() chan DepthEvent
  33. GetErrorChannel() chan error
  34. }
  35. type BidCount struct {
  36. Price string
  37. Volume string
  38. }
  39. type Bids []*BidCount
  40. type DepthEvent struct {
  41. EventType string
  42. EventTime int64
  43. Symbol string
  44. FirstUpdateID int64
  45. FinalUpdateID int64
  46. BidsToUpdate Bids
  47. AsksToUpdate Bids
  48. }
  49. type rawBids [][]interface{}
  50. type depth struct {
  51. LastUpdateID int64 `json:"lastUpdateId"`
  52. Bids rawBids `json:"bids"`
  53. }
  54. type depthEvent struct {
  55. EventType string `json:"e"`
  56. EventTime int64 `json:"E"`
  57. Symbol string `json:"s"`
  58. FirstUpdateID int64 `json:"U"`
  59. FinalUpdateID int64 `json:"u"`
  60. BidsToUpdate rawBids `json:"b"`
  61. AsksToUpdate rawBids `json:"a"`
  62. }
  63. func (b *binanceStream) GetDepthEventChannel() chan DepthEvent {
  64. return b.DepthEventChannel
  65. }
  66. func (b *binanceStream) GetErrorChannel() chan error {
  67. return b.ErrorChannel
  68. }
  69. func (b *binanceStream) Start() {
  70. go func() {
  71. e := b.getDepth(b.Options.Symbol, 1000)
  72. if e != nil {
  73. b.ErrorChannel <- e
  74. }
  75. b.processDepthEvents()
  76. }()
  77. }
  78. func transformRawBids(rawBids rawBids) Bids {
  79. bids := Bids{}
  80. for _, bSet := range rawBids {
  81. price := ""
  82. vol := ""
  83. for idx, rBid := range bSet {
  84. val, ok := rBid.(string)
  85. if ok {
  86. if idx == 0 {
  87. price = val
  88. }
  89. if idx == 1 {
  90. vol = val
  91. }
  92. }
  93. }
  94. bids = append(bids, &BidCount{
  95. Price: price,
  96. Volume: vol,
  97. })
  98. }
  99. return bids
  100. }
  101. func (b *binanceStream) processTradeEvents() {
  102. origin := "http://localhost/"
  103. url := b.getWebsocketURL("trade")
  104. ws, err := websocket.Dial(url, "", origin)
  105. if err != nil {
  106. b.ErrorChannel <- err
  107. return
  108. }
  109. if _, err := ws.Write(nil); err != nil {
  110. b.ErrorChannel <- err
  111. return
  112. }
  113. var msg = make([]byte, 4096)
  114. var n int
  115. n, err = ws.Read(msg)
  116. if err != nil {
  117. b.ErrorChannel <- err
  118. return
  119. }
  120. for err == nil {
  121. evt := depthEvent{}
  122. e := json.Unmarshal(msg[:n], &evt)
  123. if e != nil {
  124. b.ErrorChannel <- e
  125. return
  126. }
  127. if evt.FinalUpdateID > b.LastUpdateID {
  128. b.DepthEventChannel <- DepthEvent{
  129. EventType: evt.EventType,
  130. EventTime: evt.EventTime,
  131. Symbol: evt.Symbol,
  132. FirstUpdateID: evt.FirstUpdateID,
  133. FinalUpdateID: evt.FinalUpdateID,
  134. BidsToUpdate: transformRawBids(evt.BidsToUpdate),
  135. AsksToUpdate: transformRawBids(evt.AsksToUpdate),
  136. }
  137. }
  138. n, err = ws.Read(msg)
  139. }
  140. }
  141. func (b *binanceStream) getWebsocketURL(view string) string {
  142. return fmt.Sprintf("wss://stream.binance.com:9443/ws/%s@%s", b.Options.Symbol, view)
  143. }
  144. func (b *binanceStream) processDepthEvents() {
  145. origin := "http://localhost/"
  146. url := b.getWebsocketURL("depth")
  147. ws, err := websocket.Dial(url, "", origin)
  148. if err != nil {
  149. fmt.Println(err)
  150. b.ErrorChannel <- err
  151. return
  152. }
  153. if _, err := ws.Write(nil); err != nil {
  154. b.ErrorChannel <- err
  155. return
  156. }
  157. var msg = make([]byte, 4096)
  158. var n int
  159. n, err = ws.Read(msg)
  160. if err != nil {
  161. b.ErrorChannel <- err
  162. return
  163. }
  164. for err == nil {
  165. evt := depthEvent{}
  166. e := json.Unmarshal(msg[:n], &evt)
  167. if e != nil {
  168. fmt.Println(msg)
  169. b.ErrorChannel <- e
  170. return
  171. }
  172. if evt.FinalUpdateID > b.LastUpdateID {
  173. b.DepthEventChannel <- DepthEvent{
  174. EventType: evt.EventType,
  175. EventTime: evt.EventTime,
  176. Symbol: evt.Symbol,
  177. FirstUpdateID: evt.FirstUpdateID,
  178. FinalUpdateID: evt.FinalUpdateID,
  179. BidsToUpdate: transformRawBids(evt.BidsToUpdate),
  180. AsksToUpdate: transformRawBids(evt.AsksToUpdate),
  181. }
  182. }
  183. n, err = ws.Read(msg)
  184. }
  185. }
  186. func (b *binanceStream) getDepth(symbol string, limit int) error {
  187. req, err := http.NewRequest("GET", fmt.Sprintf("https://www.binance.com/api/v1/depth?symbol=%s&limit=%d", symbol, limit), nil)
  188. if err != nil {
  189. return err
  190. }
  191. resp, err := http.DefaultClient.Do(req)
  192. if err != nil {
  193. return err
  194. }
  195. defer resp.Body.Close()
  196. rawData, err := ioutil.ReadAll(resp.Body)
  197. if err != nil {
  198. return err
  199. }
  200. depth := &depth{}
  201. err = json.Unmarshal(rawData, depth)
  202. if err != nil {
  203. return err
  204. }
  205. b.LastUpdateID = depth.LastUpdateID
  206. return nil
  207. }