pool.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. // Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to
  2. // reduce copying and to allow reuse of individual chunks.
  3. package buffer
  4. import (
  5. "io"
  6. "net"
  7. "sync"
  8. )
  9. // PoolConfig contains configuration for the allocation and reuse strategy.
  10. type PoolConfig struct {
  11. StartSize int // Minimum chunk size that is allocated.
  12. PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead.
  13. MaxSize int // Maximum chunk size that will be allocated.
  14. }
  15. var config = PoolConfig{
  16. StartSize: 128,
  17. PooledSize: 512,
  18. MaxSize: 32768,
  19. }
  20. // Reuse pool: chunk size -> pool.
  21. var buffers = map[int]*sync.Pool{}
  22. func initBuffers() {
  23. for l := config.PooledSize; l <= config.MaxSize; l *= 2 {
  24. buffers[l] = new(sync.Pool)
  25. }
  26. }
  27. func init() {
  28. initBuffers()
  29. }
  30. // Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done.
  31. func Init(cfg PoolConfig) {
  32. config = cfg
  33. initBuffers()
  34. }
  35. // putBuf puts a chunk to reuse pool if it can be reused.
  36. func putBuf(buf []byte) {
  37. size := cap(buf)
  38. if size < config.PooledSize {
  39. return
  40. }
  41. if c := buffers[size]; c != nil {
  42. c.Put(buf[:0])
  43. }
  44. }
  45. // getBuf gets a chunk from reuse pool or creates a new one if reuse failed.
  46. func getBuf(size int) []byte {
  47. if size >= config.PooledSize {
  48. if c := buffers[size]; c != nil {
  49. v := c.Get()
  50. if v != nil {
  51. return v.([]byte)
  52. }
  53. }
  54. }
  55. return make([]byte, 0, size)
  56. }
  57. // Buffer is a buffer optimized for serialization without extra copying.
  58. type Buffer struct {
  59. // Buf is the current chunk that can be used for serialization.
  60. Buf []byte
  61. toPool []byte
  62. bufs [][]byte
  63. }
  64. // EnsureSpace makes sure that the current chunk contains at least s free bytes,
  65. // possibly creating a new chunk.
  66. func (b *Buffer) EnsureSpace(s int) {
  67. if cap(b.Buf)-len(b.Buf) < s {
  68. b.ensureSpaceSlow(s)
  69. }
  70. }
  71. func (b *Buffer) ensureSpaceSlow(s int) {
  72. l := len(b.Buf)
  73. if l > 0 {
  74. if cap(b.toPool) != cap(b.Buf) {
  75. // Chunk was reallocated, toPool can be pooled.
  76. putBuf(b.toPool)
  77. }
  78. if cap(b.bufs) == 0 {
  79. b.bufs = make([][]byte, 0, 8)
  80. }
  81. b.bufs = append(b.bufs, b.Buf)
  82. l = cap(b.toPool) * 2
  83. } else {
  84. l = config.StartSize
  85. }
  86. if l > config.MaxSize {
  87. l = config.MaxSize
  88. }
  89. b.Buf = getBuf(l)
  90. b.toPool = b.Buf
  91. }
  92. // AppendByte appends a single byte to buffer.
  93. func (b *Buffer) AppendByte(data byte) {
  94. b.EnsureSpace(1)
  95. b.Buf = append(b.Buf, data)
  96. }
  97. // AppendBytes appends a byte slice to buffer.
  98. func (b *Buffer) AppendBytes(data []byte) {
  99. if len(data) <= cap(b.Buf)-len(b.Buf) {
  100. b.Buf = append(b.Buf, data...) // fast path
  101. } else {
  102. b.appendBytesSlow(data)
  103. }
  104. }
  105. func (b *Buffer) appendBytesSlow(data []byte) {
  106. for len(data) > 0 {
  107. b.EnsureSpace(1)
  108. sz := cap(b.Buf) - len(b.Buf)
  109. if sz > len(data) {
  110. sz = len(data)
  111. }
  112. b.Buf = append(b.Buf, data[:sz]...)
  113. data = data[sz:]
  114. }
  115. }
  116. // AppendString appends a string to buffer.
  117. func (b *Buffer) AppendString(data string) {
  118. if len(data) <= cap(b.Buf)-len(b.Buf) {
  119. b.Buf = append(b.Buf, data...) // fast path
  120. } else {
  121. b.appendStringSlow(data)
  122. }
  123. }
  124. func (b *Buffer) appendStringSlow(data string) {
  125. for len(data) > 0 {
  126. b.EnsureSpace(1)
  127. sz := cap(b.Buf) - len(b.Buf)
  128. if sz > len(data) {
  129. sz = len(data)
  130. }
  131. b.Buf = append(b.Buf, data[:sz]...)
  132. data = data[sz:]
  133. }
  134. }
  135. // Size computes the size of a buffer by adding sizes of every chunk.
  136. func (b *Buffer) Size() int {
  137. size := len(b.Buf)
  138. for _, buf := range b.bufs {
  139. size += len(buf)
  140. }
  141. return size
  142. }
  143. // DumpTo outputs the contents of a buffer to a writer and resets the buffer.
  144. func (b *Buffer) DumpTo(w io.Writer) (written int, err error) {
  145. bufs := net.Buffers(b.bufs)
  146. if len(b.Buf) > 0 {
  147. bufs = append(bufs, b.Buf)
  148. }
  149. n, err := bufs.WriteTo(w)
  150. for _, buf := range b.bufs {
  151. putBuf(buf)
  152. }
  153. putBuf(b.toPool)
  154. b.bufs = nil
  155. b.Buf = nil
  156. b.toPool = nil
  157. return int(n), err
  158. }
  159. // BuildBytes creates a single byte slice with all the contents of the buffer. Data is
  160. // copied if it does not fit in a single chunk. You can optionally provide one byte
  161. // slice as argument that it will try to reuse.
  162. func (b *Buffer) BuildBytes(reuse ...[]byte) []byte {
  163. if len(b.bufs) == 0 {
  164. ret := b.Buf
  165. b.toPool = nil
  166. b.Buf = nil
  167. return ret
  168. }
  169. var ret []byte
  170. size := b.Size()
  171. // If we got a buffer as argument and it is big enough, reuse it.
  172. if len(reuse) == 1 && cap(reuse[0]) >= size {
  173. ret = reuse[0][:0]
  174. } else {
  175. ret = make([]byte, 0, size)
  176. }
  177. for _, buf := range b.bufs {
  178. ret = append(ret, buf...)
  179. putBuf(buf)
  180. }
  181. ret = append(ret, b.Buf...)
  182. putBuf(b.toPool)
  183. b.bufs = nil
  184. b.toPool = nil
  185. b.Buf = nil
  186. return ret
  187. }
  188. type readCloser struct {
  189. offset int
  190. bufs [][]byte
  191. }
  192. func (r *readCloser) Read(p []byte) (n int, err error) {
  193. for _, buf := range r.bufs {
  194. // Copy as much as we can.
  195. x := copy(p[n:], buf[r.offset:])
  196. n += x // Increment how much we filled.
  197. // Did we empty the whole buffer?
  198. if r.offset+x == len(buf) {
  199. // On to the next buffer.
  200. r.offset = 0
  201. r.bufs = r.bufs[1:]
  202. // We can release this buffer.
  203. putBuf(buf)
  204. } else {
  205. r.offset += x
  206. }
  207. if n == len(p) {
  208. break
  209. }
  210. }
  211. // No buffers left or nothing read?
  212. if len(r.bufs) == 0 {
  213. err = io.EOF
  214. }
  215. return
  216. }
  217. func (r *readCloser) Close() error {
  218. // Release all remaining buffers.
  219. for _, buf := range r.bufs {
  220. putBuf(buf)
  221. }
  222. // In case Close gets called multiple times.
  223. r.bufs = nil
  224. return nil
  225. }
  226. // ReadCloser creates an io.ReadCloser with all the contents of the buffer.
  227. func (b *Buffer) ReadCloser() io.ReadCloser {
  228. ret := &readCloser{0, append(b.bufs, b.Buf)}
  229. b.bufs = nil
  230. b.toPool = nil
  231. b.Buf = nil
  232. return ret
  233. }