stream.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package dara
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "io"
  7. "io/ioutil"
  8. "strings"
  9. "fmt"
  10. )
  11. // 定义 Event 结构体
  12. type SSEEvent struct {
  13. ID *string
  14. Event *string
  15. Data *string
  16. Retry *int
  17. }
  18. // 解析单个事件
  19. func parseEvent(lines []string) *SSEEvent {
  20. event := &SSEEvent{}
  21. for _, line := range lines {
  22. if strings.HasPrefix(line, "data: ") {
  23. data := strings.TrimPrefix(line, "data: ") + "\n"
  24. if event.Data == nil {
  25. event.Data = new(string)
  26. }
  27. *event.Data += data
  28. } else if strings.HasPrefix(line, "event: ") {
  29. eventName := strings.TrimPrefix(line, "event: ")
  30. event.Event = &eventName
  31. } else if strings.HasPrefix(line, "id: ") {
  32. id := strings.TrimPrefix(line, "id: ")
  33. event.ID = &id
  34. } else if strings.HasPrefix(line, "retry: ") {
  35. var retry int
  36. fmt.Sscanf(strings.TrimPrefix(line, "retry: "), "%d", &retry)
  37. event.Retry = &retry
  38. }
  39. }
  40. // Remove last newline from data
  41. if event.Data != nil {
  42. data := strings.TrimRight(*event.Data, "\n")
  43. event.Data = &data
  44. }
  45. return event
  46. }
  47. func ReadAsBytes(body io.Reader) ([]byte, error) {
  48. byt, err := ioutil.ReadAll(body)
  49. if err != nil {
  50. return nil, err
  51. }
  52. r, ok := body.(io.ReadCloser)
  53. if ok {
  54. r.Close()
  55. }
  56. return byt, nil
  57. }
  58. func ReadAsJSON(body io.Reader) (result interface{}, err error) {
  59. byt, err := ioutil.ReadAll(body)
  60. if err != nil {
  61. return
  62. }
  63. if string(byt) == "" {
  64. return
  65. }
  66. r, ok := body.(io.ReadCloser)
  67. if ok {
  68. r.Close()
  69. }
  70. d := json.NewDecoder(bytes.NewReader(byt))
  71. d.UseNumber()
  72. err = d.Decode(&result)
  73. return
  74. }
  75. func ReadAsString(body io.Reader) (string, error) {
  76. byt, err := ioutil.ReadAll(body)
  77. if err != nil {
  78. return "", err
  79. }
  80. r, ok := body.(io.ReadCloser)
  81. if ok {
  82. r.Close()
  83. }
  84. return string(byt), nil
  85. }
  86. func ReadAsSSE(body io.ReadCloser, eventChannel chan *SSEEvent, errorChannel chan error) {
  87. go func() {
  88. defer func() {
  89. body.Close()
  90. close(eventChannel)
  91. }()
  92. reader := bufio.NewReader(body)
  93. var eventLines []string
  94. for {
  95. line, err := reader.ReadString('\n')
  96. if err != nil {
  97. if err == io.EOF {
  98. // Handle the end of the stream and possibly pending event
  99. if len(eventLines) > 0 {
  100. event := parseEvent(eventLines)
  101. eventChannel <- event
  102. }
  103. errorChannel <- nil
  104. return
  105. }
  106. errorChannel <- err
  107. return
  108. }
  109. line = strings.TrimRight(line, "\n")
  110. if line == "" {
  111. // End of an SSE event
  112. if len(eventLines) > 0 {
  113. event := parseEvent(eventLines)
  114. eventChannel <- event
  115. eventLines = []string{} // Reset for the next event
  116. }
  117. continue
  118. }
  119. eventLines = append(eventLines, line)
  120. }
  121. }()
  122. return
  123. }