grpool.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. // Package grpool implements a goroutine reusable pool.
  7. package grpool
  8. import (
  9. "github.com/gogf/gf/errors/gcode"
  10. "github.com/gogf/gf/errors/gerror"
  11. "github.com/gogf/gf/container/glist"
  12. "github.com/gogf/gf/container/gtype"
  13. )
  14. // Goroutine Pool
  15. type Pool struct {
  16. limit int // Max goroutine count limit.
  17. count *gtype.Int // Current running goroutine count.
  18. list *glist.List // Job list for asynchronous job adding purpose.
  19. closed *gtype.Bool // Is pool closed or not.
  20. }
  21. // Default goroutine pool.
  22. var pool = New()
  23. // New creates and returns a new goroutine pool object.
  24. // The parameter <limit> is used to limit the max goroutine count,
  25. // which is not limited in default.
  26. func New(limit ...int) *Pool {
  27. p := &Pool{
  28. limit: -1,
  29. count: gtype.NewInt(),
  30. list: glist.New(true),
  31. closed: gtype.NewBool(),
  32. }
  33. if len(limit) > 0 && limit[0] > 0 {
  34. p.limit = limit[0]
  35. }
  36. return p
  37. }
  38. // Add pushes a new job to the pool using default goroutine pool.
  39. // The job will be executed asynchronously.
  40. func Add(f func()) error {
  41. return pool.Add(f)
  42. }
  43. // AddWithRecover pushes a new job to the pool with specified recover function.
  44. // The optional <recoverFunc> is called when any panic during executing of <userFunc>.
  45. // If <recoverFunc> is not passed or given nil, it ignores the panic from <userFunc>.
  46. // The job will be executed asynchronously.
  47. func AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
  48. return pool.AddWithRecover(userFunc, recoverFunc...)
  49. }
  50. // Size returns current goroutine count of default goroutine pool.
  51. func Size() int {
  52. return pool.Size()
  53. }
  54. // Jobs returns current job count of default goroutine pool.
  55. func Jobs() int {
  56. return pool.Jobs()
  57. }
  58. // Add pushes a new job to the pool.
  59. // The job will be executed asynchronously.
  60. func (p *Pool) Add(f func()) error {
  61. for p.closed.Val() {
  62. return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed")
  63. }
  64. p.list.PushFront(f)
  65. // Check whether fork new goroutine or not.
  66. var n int
  67. for {
  68. n = p.count.Val()
  69. if p.limit != -1 && n >= p.limit {
  70. // No need fork new goroutine.
  71. return nil
  72. }
  73. if p.count.Cas(n, n+1) {
  74. // Use CAS to guarantee atomicity.
  75. break
  76. }
  77. }
  78. p.fork()
  79. return nil
  80. }
  81. // AddWithRecover pushes a new job to the pool with specified recover function.
  82. // The optional <recoverFunc> is called when any panic during executing of <userFunc>.
  83. // If <recoverFunc> is not passed or given nil, it ignores the panic from <userFunc>.
  84. // The job will be executed asynchronously.
  85. func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
  86. return p.Add(func() {
  87. defer func() {
  88. if exception := recover(); exception != nil {
  89. if len(recoverFunc) > 0 && recoverFunc[0] != nil {
  90. if err, ok := exception.(error); ok {
  91. recoverFunc[0](err)
  92. } else {
  93. recoverFunc[0](gerror.NewCodef(gcode.CodeInternalError, `%v`, exception))
  94. }
  95. }
  96. }
  97. }()
  98. userFunc()
  99. })
  100. }
  101. // Cap returns the capacity of the pool.
  102. // This capacity is defined when pool is created.
  103. // It returns -1 if there's no limit.
  104. func (p *Pool) Cap() int {
  105. return p.limit
  106. }
  107. // Size returns current goroutine count of the pool.
  108. func (p *Pool) Size() int {
  109. return p.count.Val()
  110. }
  111. // Jobs returns current job count of the pool.
  112. // Note that, it does not return worker/goroutine count but the job/task count.
  113. func (p *Pool) Jobs() int {
  114. return p.list.Size()
  115. }
  116. // fork creates a new goroutine worker.
  117. // Note that the worker dies if the job function panics.
  118. func (p *Pool) fork() {
  119. go func() {
  120. defer p.count.Add(-1)
  121. var job interface{}
  122. for !p.closed.Val() {
  123. if job = p.list.PopBack(); job != nil {
  124. job.(func())()
  125. } else {
  126. return
  127. }
  128. }
  129. }()
  130. }
  131. // IsClosed returns if pool is closed.
  132. func (p *Pool) IsClosed() bool {
  133. return p.closed.Val()
  134. }
  135. // Close closes the goroutine pool, which makes all goroutines exit.
  136. func (p *Pool) Close() {
  137. p.closed.Set(true)
  138. }