retry.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373
  1. package dara
  2. import (
  3. "fmt"
  4. "math"
  5. "math/rand"
  6. )
  7. const (
  8. MAX_DELAY_TIME = 120 * 1000 // 120 seconds
  9. MIN_DELAY_TIME = 100 // 100 milliseconds
  10. DEFAULT_MAX_CAP = 3 * 24 * 60 * 60 * 1000 // 3 days in milliseconds
  11. MAX_ATTEMPTS = 3
  12. )
  13. // RetryPolicyContext holds context for the retry operation
  14. type RetryPolicyContext struct {
  15. Key string
  16. RetriesAttempted int
  17. HttpRequest *Request // placeholder for actual http.Request type
  18. HttpResponse *Response // placeholder for actual http.Response type
  19. Exception error
  20. }
  21. // BackoffPolicy interface with a method to get delay time
  22. type BackoffPolicy interface {
  23. GetDelayTime(ctx *RetryPolicyContext) int
  24. }
  25. // BackoffPolicyFactory creates a BackoffPolicy based on the option
  26. func BackoffPolicyFactory(option map[string]interface{}) (BackoffPolicy, error) {
  27. switch option["policy"] {
  28. case "Fixed":
  29. return NewFixedBackoffPolicy(option), nil
  30. case "Random":
  31. return NewRandomBackoffPolicy(option), nil
  32. case "Exponential":
  33. return NewExponentialBackoffPolicy(option), nil
  34. case "EqualJitter", "ExponentialWithEqualJitter":
  35. return NewEqualJitterBackoffPolicy(option), nil
  36. case "FullJitter", "ExponentialWithFullJitter":
  37. return NewFullJitterBackoffPolicy(option), nil
  38. }
  39. return nil, fmt.Errorf("unknown policy type")
  40. }
  41. // FixedBackoffPolicy implementation
  42. type FixedBackoffPolicy struct {
  43. Period int
  44. }
  45. func NewFixedBackoffPolicy(option map[string]interface{}) *FixedBackoffPolicy {
  46. var period int
  47. if v, ok := option["period"]; ok {
  48. period = v.(int)
  49. }
  50. return &FixedBackoffPolicy{
  51. Period: period,
  52. }
  53. }
  54. func (f *FixedBackoffPolicy) GetDelayTime(ctx *RetryPolicyContext) int {
  55. return f.Period
  56. }
  57. // RandomBackoffPolicy implementation
  58. type RandomBackoffPolicy struct {
  59. Period int
  60. Cap int
  61. }
  62. func NewRandomBackoffPolicy(option map[string]interface{}) *RandomBackoffPolicy {
  63. var capValue int
  64. var period int
  65. if v, ok := option["cap"]; ok {
  66. capValue = v.(int)
  67. } else {
  68. capValue = 20 * 1000
  69. }
  70. if v, ok := option["period"]; ok {
  71. period = v.(int)
  72. }
  73. return &RandomBackoffPolicy{
  74. Period: period,
  75. Cap: capValue,
  76. }
  77. }
  78. func (r *RandomBackoffPolicy) GetDelayTime(ctx *RetryPolicyContext) int {
  79. randomSeed := int64(ctx.RetriesAttempted * r.Period)
  80. randomTime := int(rand.Int63n(randomSeed))
  81. if randomTime > r.Cap {
  82. return r.Cap
  83. }
  84. return randomTime
  85. }
  86. // ExponentialBackoffPolicy implementation
  87. type ExponentialBackoffPolicy struct {
  88. Period int
  89. Cap int
  90. }
  91. func NewExponentialBackoffPolicy(option map[string]interface{}) *ExponentialBackoffPolicy {
  92. var capValue int
  93. var period int
  94. if v, ok := option["cap"]; ok {
  95. capValue = v.(int)
  96. } else {
  97. capValue = DEFAULT_MAX_CAP
  98. }
  99. if v, ok := option["period"]; ok {
  100. period = v.(int)
  101. }
  102. return &ExponentialBackoffPolicy{
  103. Period: period,
  104. Cap: capValue,
  105. }
  106. }
  107. func (e *ExponentialBackoffPolicy) GetDelayTime(ctx *RetryPolicyContext) int {
  108. randomTime := int(math.Pow(2, float64(ctx.RetriesAttempted)*float64(e.Period)))
  109. if randomTime > e.Cap {
  110. return e.Cap
  111. }
  112. return randomTime
  113. }
  114. // EqualJitterBackoffPolicy implementation
  115. type EqualJitterBackoffPolicy struct {
  116. Period int
  117. Cap int
  118. }
  119. func NewEqualJitterBackoffPolicy(option map[string]interface{}) *EqualJitterBackoffPolicy {
  120. var capValue int
  121. var period int
  122. if v, ok := option["cap"]; ok {
  123. capValue = v.(int)
  124. } else {
  125. capValue = DEFAULT_MAX_CAP
  126. }
  127. if v, ok := option["period"]; ok {
  128. period = v.(int)
  129. }
  130. return &EqualJitterBackoffPolicy{
  131. Period: period,
  132. Cap: capValue,
  133. }
  134. }
  135. func (e *EqualJitterBackoffPolicy) GetDelayTime(ctx *RetryPolicyContext) int {
  136. ceil := int64(math.Min(float64(e.Cap), float64(math.Pow(2, float64(ctx.RetriesAttempted)*float64(e.Period)))))
  137. randNum := rand.Int63n(ceil/2 + 1)
  138. return int(ceil/2 + randNum)
  139. }
  140. // FullJitterBackoffPolicy implementation
  141. type FullJitterBackoffPolicy struct {
  142. Period int
  143. Cap int
  144. }
  145. func NewFullJitterBackoffPolicy(option map[string]interface{}) *FullJitterBackoffPolicy {
  146. var capValue int
  147. var period int
  148. if v, ok := option["cap"]; ok {
  149. capValue = v.(int)
  150. } else {
  151. capValue = DEFAULT_MAX_CAP
  152. }
  153. if v, ok := option["period"]; ok {
  154. period = v.(int)
  155. }
  156. return &FullJitterBackoffPolicy{
  157. Period: period,
  158. Cap: capValue,
  159. }
  160. }
  161. func (f *FullJitterBackoffPolicy) GetDelayTime(ctx *RetryPolicyContext) int {
  162. ceil := int64(math.Min(float64(f.Cap), float64(math.Pow(2, float64(ctx.RetriesAttempted)*float64(f.Period)))))
  163. return int(rand.Int63n(ceil))
  164. }
  165. // RetryCondition holds the retry conditions
  166. type RetryCondition struct {
  167. MaxAttempts int
  168. Backoff BackoffPolicy
  169. Exception []string
  170. ErrorCode []string
  171. MaxDelay int
  172. }
  173. func NewRetryCondition(condition map[string]interface{}) *RetryCondition {
  174. var backoff BackoffPolicy
  175. if condition["backoff"] != nil {
  176. backoffOption := condition["backoff"].(map[string]interface{})
  177. backoff, _ = BackoffPolicyFactory(backoffOption)
  178. }
  179. maxAttempts, ok := condition["maxAttempts"].(int)
  180. if !ok {
  181. maxAttempts = MAX_ATTEMPTS
  182. }
  183. exception, ok := condition["exception"].([]string)
  184. if !ok {
  185. exception = []string{}
  186. }
  187. errorCode, ok := condition["errorCode"].([]string)
  188. if !ok {
  189. errorCode = []string{}
  190. }
  191. maxDelay, ok := condition["maxDelay"].(int)
  192. if !ok {
  193. maxDelay = MAX_DELAY_TIME
  194. }
  195. return &RetryCondition{
  196. MaxAttempts: maxAttempts,
  197. Backoff: backoff,
  198. Exception: exception,
  199. ErrorCode: errorCode,
  200. MaxDelay: maxDelay,
  201. }
  202. }
  203. // RetryOptions holds the retry options
  204. type RetryOptions struct {
  205. Retryable bool
  206. RetryCondition []*RetryCondition
  207. NoRetryCondition []*RetryCondition
  208. }
  209. func NewRetryOptions(options map[string]interface{}) *RetryOptions {
  210. retryConditions := make([]*RetryCondition, 0)
  211. for _, cond := range options["retryCondition"].([]interface{}) {
  212. condition := NewRetryCondition(cond.(map[string]interface{}))
  213. retryConditions = append(retryConditions, condition)
  214. }
  215. noRetryConditions := make([]*RetryCondition, 0)
  216. for _, cond := range options["noRetryCondition"].([]interface{}) {
  217. condition := NewRetryCondition(cond.(map[string]interface{}))
  218. noRetryConditions = append(noRetryConditions, condition)
  219. }
  220. return &RetryOptions{
  221. Retryable: options["retryable"].(bool),
  222. RetryCondition: retryConditions,
  223. NoRetryCondition: noRetryConditions,
  224. }
  225. }
  226. // shouldRetry determines if a retry should be attempted
  227. func ShouldRetry(options *RetryOptions, ctx *RetryPolicyContext) bool {
  228. if ctx.RetriesAttempted == 0 {
  229. return true
  230. }
  231. if options == nil || !options.Retryable {
  232. return false
  233. }
  234. retriesAttempted := ctx.RetriesAttempted
  235. ex := ctx.Exception
  236. if baseErr, ok := ex.(BaseError); ok {
  237. conditions := options.NoRetryCondition
  238. for _, condition := range conditions {
  239. for _, exc := range condition.Exception {
  240. if exc == StringValue(baseErr.GetName()) {
  241. return false
  242. }
  243. }
  244. for _, code := range condition.ErrorCode {
  245. if code == StringValue(baseErr.GetCode()) {
  246. return false
  247. }
  248. }
  249. }
  250. conditions = options.RetryCondition
  251. for _, condition := range conditions {
  252. for _, exc := range condition.Exception {
  253. if exc == StringValue(baseErr.GetName()) {
  254. if retriesAttempted >= condition.MaxAttempts {
  255. return false
  256. }
  257. return true
  258. }
  259. }
  260. for _, code := range condition.ErrorCode {
  261. if code == StringValue(baseErr.GetCode()) {
  262. if retriesAttempted >= condition.MaxAttempts {
  263. return false
  264. }
  265. return true
  266. }
  267. }
  268. }
  269. }
  270. return false
  271. }
  272. // getBackoffDelay calculates backoff delay
  273. func GetBackoffDelay(options *RetryOptions, ctx *RetryPolicyContext) int {
  274. if ctx.RetriesAttempted == 0 {
  275. return 0
  276. }
  277. if options == nil || !options.Retryable {
  278. return MIN_DELAY_TIME
  279. }
  280. ex := ctx.Exception
  281. conditions := options.RetryCondition
  282. if baseErr, ok := ex.(BaseError); ok {
  283. for _, condition := range conditions {
  284. for _, exc := range condition.Exception {
  285. if exc == StringValue(baseErr.GetName()) {
  286. maxDelay := condition.MaxDelay
  287. // Simulated "retryAfter" from an error response
  288. if respErr, ok := ex.(ResponseError); ok {
  289. retryAfter := Int64Value(respErr.GetRetryAfter())
  290. if retryAfter != 0 {
  291. return min(int(retryAfter), maxDelay)
  292. }
  293. }
  294. // This would be set properly based on your error handling
  295. if condition.Backoff == nil {
  296. return MIN_DELAY_TIME
  297. }
  298. return min(condition.Backoff.GetDelayTime(ctx), maxDelay)
  299. }
  300. }
  301. for _, code := range condition.ErrorCode {
  302. if code == StringValue(baseErr.GetCode()) {
  303. maxDelay := condition.MaxDelay
  304. // Simulated "retryAfter" from an error response
  305. if respErr, ok := ex.(ResponseError); ok {
  306. retryAfter := Int64Value(respErr.GetRetryAfter())
  307. if retryAfter != 0 {
  308. return min(int(retryAfter), maxDelay)
  309. }
  310. }
  311. if condition.Backoff == nil {
  312. return MIN_DELAY_TIME
  313. }
  314. return min(condition.Backoff.GetDelayTime(ctx), maxDelay)
  315. }
  316. }
  317. }
  318. }
  319. return MIN_DELAY_TIME
  320. }
  321. // helper function to find the minimum of two values
  322. func min(a, b int) int {
  323. if a < b {
  324. return a
  325. }
  326. return b
  327. }