gqueue.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package gqueue provides dynamic/static concurrent-safe queue.
  7. //
  8. // Features:
  9. //
  10. // 1. FIFO queue(data -> list -> chan);
  11. //
  12. // 2. Fast creation and initialization;
  13. //
  14. // 3. Support dynamic queue size(unlimited queue size);
  15. //
  16. // 4. Blocking when reading data from queue;
  17. //
  18. package gqueue
  19. import (
  20. "math"
  21. "github.com/gogf/gf/container/glist"
  22. "github.com/gogf/gf/container/gtype"
  23. )
  24. // Queue is a concurrent-safe queue built on doubly linked list and channel.
  25. type Queue struct {
  26. limit int // Limit for queue size.
  27. list *glist.List // Underlying list structure for data maintaining.
  28. closed *gtype.Bool // Whether queue is closed.
  29. events chan struct{} // Events for data writing.
  30. C chan interface{} // Underlying channel for data reading.
  31. }
  32. const (
  33. defaultQueueSize = 10000 // Size for queue buffer.
  34. defaultBatchSize = 10 // Max batch size per-fetching from list.
  35. )
  36. // New returns an empty queue object.
  37. // Optional parameter <limit> is used to limit the size of the queue, which is unlimited in default.
  38. // When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel.
  39. func New(limit ...int) *Queue {
  40. q := &Queue{
  41. closed: gtype.NewBool(),
  42. }
  43. if len(limit) > 0 && limit[0] > 0 {
  44. q.limit = limit[0]
  45. q.C = make(chan interface{}, limit[0])
  46. } else {
  47. q.list = glist.New(true)
  48. q.events = make(chan struct{}, math.MaxInt32)
  49. q.C = make(chan interface{}, defaultQueueSize)
  50. go q.asyncLoopFromListToChannel()
  51. }
  52. return q
  53. }
  54. // asyncLoopFromListToChannel starts an asynchronous goroutine,
  55. // which handles the data synchronization from list <q.list> to channel <q.C>.
  56. func (q *Queue) asyncLoopFromListToChannel() {
  57. defer func() {
  58. if q.closed.Val() {
  59. _ = recover()
  60. }
  61. }()
  62. for !q.closed.Val() {
  63. <-q.events
  64. for !q.closed.Val() {
  65. if length := q.list.Len(); length > 0 {
  66. if length > defaultBatchSize {
  67. length = defaultBatchSize
  68. }
  69. for _, v := range q.list.PopFronts(length) {
  70. // When q.C is closed, it will panic here, especially q.C is being blocked for writing.
  71. // If any error occurs here, it will be caught by recover and be ignored.
  72. q.C <- v
  73. }
  74. } else {
  75. break
  76. }
  77. }
  78. // Clear q.events to remain just one event to do the next synchronization check.
  79. for i := 0; i < len(q.events)-1; i++ {
  80. <-q.events
  81. }
  82. }
  83. // It should be here to close q.C if <q> is unlimited size.
  84. // It's the sender's responsibility to close channel when it should be closed.
  85. close(q.C)
  86. }
  87. // Push pushes the data <v> into the queue.
  88. // Note that it would panics if Push is called after the queue is closed.
  89. func (q *Queue) Push(v interface{}) {
  90. if q.limit > 0 {
  91. q.C <- v
  92. } else {
  93. q.list.PushBack(v)
  94. if len(q.events) < defaultQueueSize {
  95. q.events <- struct{}{}
  96. }
  97. }
  98. }
  99. // Pop pops an item from the queue in FIFO way.
  100. // Note that it would return nil immediately if Pop is called after the queue is closed.
  101. func (q *Queue) Pop() interface{} {
  102. return <-q.C
  103. }
  104. // Close closes the queue.
  105. // Notice: It would notify all goroutines return immediately,
  106. // which are being blocked reading using Pop method.
  107. func (q *Queue) Close() {
  108. q.closed.Set(true)
  109. if q.events != nil {
  110. close(q.events)
  111. }
  112. if q.limit > 0 {
  113. close(q.C)
  114. }
  115. for i := 0; i < defaultBatchSize; i++ {
  116. q.Pop()
  117. }
  118. }
  119. // Len returns the length of the queue.
  120. // Note that the result might not be accurate as there's a
  121. // asynchronous channel reading the list constantly.
  122. func (q *Queue) Len() (length int) {
  123. if q.list != nil {
  124. length += q.list.Len()
  125. }
  126. length += len(q.C)
  127. return
  128. }
  129. // Size is alias of Len.
  130. func (q *Queue) Size() int {
  131. return q.Len()
  132. }