unbounded_executor.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package concurrent
  2. import (
  3. "context"
  4. "fmt"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "runtime/debug"
  9. )
  10. var LogInfo = func(event string, properties ...interface{}) {
  11. }
  12. var LogPanic = func(recovered interface{}, properties ...interface{}) interface{} {
  13. fmt.Println(fmt.Sprintf("paniced: %v", recovered))
  14. debug.PrintStack()
  15. return recovered
  16. }
  17. const StopSignal = "STOP!"
  18. type UnboundedExecutor struct {
  19. ctx context.Context
  20. cancel context.CancelFunc
  21. activeGoroutinesMutex *sync.Mutex
  22. activeGoroutines map[string]int
  23. }
  24. // GlobalUnboundedExecutor has the life cycle of the program itself
  25. // any goroutine want to be shutdown before main exit can be started from this executor
  26. var GlobalUnboundedExecutor = NewUnboundedExecutor()
  27. func NewUnboundedExecutor() *UnboundedExecutor {
  28. ctx, cancel := context.WithCancel(context.TODO())
  29. return &UnboundedExecutor{
  30. ctx: ctx,
  31. cancel: cancel,
  32. activeGoroutinesMutex: &sync.Mutex{},
  33. activeGoroutines: map[string]int{},
  34. }
  35. }
  36. func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
  37. _, file, line, _ := runtime.Caller(1)
  38. executor.activeGoroutinesMutex.Lock()
  39. defer executor.activeGoroutinesMutex.Unlock()
  40. startFrom := fmt.Sprintf("%s:%d", file, line)
  41. executor.activeGoroutines[startFrom] += 1
  42. go func() {
  43. defer func() {
  44. recovered := recover()
  45. if recovered != nil && recovered != StopSignal {
  46. LogPanic(recovered)
  47. }
  48. executor.activeGoroutinesMutex.Lock()
  49. defer executor.activeGoroutinesMutex.Unlock()
  50. executor.activeGoroutines[startFrom] -= 1
  51. }()
  52. handler(executor.ctx)
  53. }()
  54. }
  55. func (executor *UnboundedExecutor) Stop() {
  56. executor.cancel()
  57. }
  58. func (executor *UnboundedExecutor) StopAndWaitForever() {
  59. executor.StopAndWait(context.Background())
  60. }
  61. func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
  62. executor.cancel()
  63. for {
  64. fiveSeconds := time.NewTimer(time.Millisecond * 100)
  65. select {
  66. case <-fiveSeconds.C:
  67. case <-ctx.Done():
  68. return
  69. }
  70. if executor.checkGoroutines() {
  71. return
  72. }
  73. }
  74. }
  75. func (executor *UnboundedExecutor) checkGoroutines() bool {
  76. executor.activeGoroutinesMutex.Lock()
  77. defer executor.activeGoroutinesMutex.Unlock()
  78. for startFrom, count := range executor.activeGoroutines {
  79. if count > 0 {
  80. LogInfo("event!unbounded_executor.still waiting goroutines to quit",
  81. "startFrom", startFrom,
  82. "count", count)
  83. return false
  84. }
  85. }
  86. return true
  87. }