_stream_writable.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. // A bit simpler than readable streams.
  2. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  3. // the drain event emission and buffering.
  4. 'use strict';
  5. module.exports = Writable;
  6. /*<replacement>*/
  7. var processNextTick = require('process-nextick-args');
  8. /*</replacement>*/
  9. /*<replacement>*/
  10. var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
  11. /*</replacement>*/
  12. /*<replacement>*/
  13. var Buffer = require('buffer').Buffer;
  14. /*</replacement>*/
  15. Writable.WritableState = WritableState;
  16. /*<replacement>*/
  17. var util = require('core-util-is');
  18. util.inherits = require('inherits');
  19. /*</replacement>*/
  20. /*<replacement>*/
  21. var internalUtil = {
  22. deprecate: require('util-deprecate')
  23. };
  24. /*</replacement>*/
  25. /*<replacement>*/
  26. var Stream;
  27. (function () {
  28. try {
  29. Stream = require('st' + 'ream');
  30. } catch (_) {} finally {
  31. if (!Stream) Stream = require('events').EventEmitter;
  32. }
  33. })();
  34. /*</replacement>*/
  35. var Buffer = require('buffer').Buffer;
  36. util.inherits(Writable, Stream);
  37. function nop() {}
  38. function WriteReq(chunk, encoding, cb) {
  39. this.chunk = chunk;
  40. this.encoding = encoding;
  41. this.callback = cb;
  42. this.next = null;
  43. }
  44. var Duplex;
  45. function WritableState(options, stream) {
  46. Duplex = Duplex || require('./_stream_duplex');
  47. options = options || {};
  48. // object stream flag to indicate whether or not this stream
  49. // contains buffers or objects.
  50. this.objectMode = !!options.objectMode;
  51. if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
  52. // the point at which write() starts returning false
  53. // Note: 0 is a valid value, means that we always return false if
  54. // the entire buffer is not flushed immediately on write()
  55. var hwm = options.highWaterMark;
  56. var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  57. this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
  58. // cast to ints.
  59. this.highWaterMark = ~ ~this.highWaterMark;
  60. this.needDrain = false;
  61. // at the start of calling end()
  62. this.ending = false;
  63. // when end() has been called, and returned
  64. this.ended = false;
  65. // when 'finish' is emitted
  66. this.finished = false;
  67. // should we decode strings into buffers before passing to _write?
  68. // this is here so that some node-core streams can optimize string
  69. // handling at a lower level.
  70. var noDecode = options.decodeStrings === false;
  71. this.decodeStrings = !noDecode;
  72. // Crypto is kind of old and crusty. Historically, its default string
  73. // encoding is 'binary' so we have to make this configurable.
  74. // Everything else in the universe uses 'utf8', though.
  75. this.defaultEncoding = options.defaultEncoding || 'utf8';
  76. // not an actual buffer we keep track of, but a measurement
  77. // of how much we're waiting to get pushed to some underlying
  78. // socket or file.
  79. this.length = 0;
  80. // a flag to see when we're in the middle of a write.
  81. this.writing = false;
  82. // when true all writes will be buffered until .uncork() call
  83. this.corked = 0;
  84. // a flag to be able to tell if the onwrite cb is called immediately,
  85. // or on a later tick. We set this to true at first, because any
  86. // actions that shouldn't happen until "later" should generally also
  87. // not happen before the first write call.
  88. this.sync = true;
  89. // a flag to know if we're processing previously buffered items, which
  90. // may call the _write() callback in the same tick, so that we don't
  91. // end up in an overlapped onwrite situation.
  92. this.bufferProcessing = false;
  93. // the callback that's passed to _write(chunk,cb)
  94. this.onwrite = function (er) {
  95. onwrite(stream, er);
  96. };
  97. // the callback that the user supplies to write(chunk,encoding,cb)
  98. this.writecb = null;
  99. // the amount that is being written when _write is called.
  100. this.writelen = 0;
  101. this.bufferedRequest = null;
  102. this.lastBufferedRequest = null;
  103. // number of pending user-supplied write callbacks
  104. // this must be 0 before 'finish' can be emitted
  105. this.pendingcb = 0;
  106. // emit prefinish if the only thing we're waiting for is _write cbs
  107. // This is relevant for synchronous Transform streams
  108. this.prefinished = false;
  109. // True if the error was already emitted and should not be thrown again
  110. this.errorEmitted = false;
  111. // count buffered requests
  112. this.bufferedRequestCount = 0;
  113. // create the two objects needed to store the corked requests
  114. // they are not a linked list, as no new elements are inserted in there
  115. this.corkedRequestsFree = new CorkedRequest(this);
  116. this.corkedRequestsFree.next = new CorkedRequest(this);
  117. }
  118. WritableState.prototype.getBuffer = function writableStateGetBuffer() {
  119. var current = this.bufferedRequest;
  120. var out = [];
  121. while (current) {
  122. out.push(current);
  123. current = current.next;
  124. }
  125. return out;
  126. };
  127. (function () {
  128. try {
  129. Object.defineProperty(WritableState.prototype, 'buffer', {
  130. get: internalUtil.deprecate(function () {
  131. return this.getBuffer();
  132. }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
  133. });
  134. } catch (_) {}
  135. })();
  136. var Duplex;
  137. function Writable(options) {
  138. Duplex = Duplex || require('./_stream_duplex');
  139. // Writable ctor is applied to Duplexes, though they're not
  140. // instanceof Writable, they're instanceof Readable.
  141. if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
  142. this._writableState = new WritableState(options, this);
  143. // legacy.
  144. this.writable = true;
  145. if (options) {
  146. if (typeof options.write === 'function') this._write = options.write;
  147. if (typeof options.writev === 'function') this._writev = options.writev;
  148. }
  149. Stream.call(this);
  150. }
  151. // Otherwise people can pipe Writable streams, which is just wrong.
  152. Writable.prototype.pipe = function () {
  153. this.emit('error', new Error('Cannot pipe. Not readable.'));
  154. };
  155. function writeAfterEnd(stream, cb) {
  156. var er = new Error('write after end');
  157. // TODO: defer error events consistently everywhere, not just the cb
  158. stream.emit('error', er);
  159. processNextTick(cb, er);
  160. }
  161. // If we get something that is not a buffer, string, null, or undefined,
  162. // and we're not in objectMode, then that's an error.
  163. // Otherwise stream chunks are all considered to be of length=1, and the
  164. // watermarks determine how many objects to keep in the buffer, rather than
  165. // how many bytes or characters.
  166. function validChunk(stream, state, chunk, cb) {
  167. var valid = true;
  168. if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== null && chunk !== undefined && !state.objectMode) {
  169. var er = new TypeError('Invalid non-string/buffer chunk');
  170. stream.emit('error', er);
  171. processNextTick(cb, er);
  172. valid = false;
  173. }
  174. return valid;
  175. }
  176. Writable.prototype.write = function (chunk, encoding, cb) {
  177. var state = this._writableState;
  178. var ret = false;
  179. if (typeof encoding === 'function') {
  180. cb = encoding;
  181. encoding = null;
  182. }
  183. if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  184. if (typeof cb !== 'function') cb = nop;
  185. if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
  186. state.pendingcb++;
  187. ret = writeOrBuffer(this, state, chunk, encoding, cb);
  188. }
  189. return ret;
  190. };
  191. Writable.prototype.cork = function () {
  192. var state = this._writableState;
  193. state.corked++;
  194. };
  195. Writable.prototype.uncork = function () {
  196. var state = this._writableState;
  197. if (state.corked) {
  198. state.corked--;
  199. if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  200. }
  201. };
  202. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  203. // node::ParseEncoding() requires lower case.
  204. if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  205. if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
  206. this._writableState.defaultEncoding = encoding;
  207. };
  208. function decodeChunk(state, chunk, encoding) {
  209. if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  210. chunk = new Buffer(chunk, encoding);
  211. }
  212. return chunk;
  213. }
  214. // if we're already writing something, then just put this
  215. // in the queue, and wait our turn. Otherwise, call _write
  216. // If we return false, then we need a drain event, so set that flag.
  217. function writeOrBuffer(stream, state, chunk, encoding, cb) {
  218. chunk = decodeChunk(state, chunk, encoding);
  219. if (Buffer.isBuffer(chunk)) encoding = 'buffer';
  220. var len = state.objectMode ? 1 : chunk.length;
  221. state.length += len;
  222. var ret = state.length < state.highWaterMark;
  223. // we must ensure that previous needDrain will not be reset to false.
  224. if (!ret) state.needDrain = true;
  225. if (state.writing || state.corked) {
  226. var last = state.lastBufferedRequest;
  227. state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
  228. if (last) {
  229. last.next = state.lastBufferedRequest;
  230. } else {
  231. state.bufferedRequest = state.lastBufferedRequest;
  232. }
  233. state.bufferedRequestCount += 1;
  234. } else {
  235. doWrite(stream, state, false, len, chunk, encoding, cb);
  236. }
  237. return ret;
  238. }
  239. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  240. state.writelen = len;
  241. state.writecb = cb;
  242. state.writing = true;
  243. state.sync = true;
  244. if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  245. state.sync = false;
  246. }
  247. function onwriteError(stream, state, sync, er, cb) {
  248. --state.pendingcb;
  249. if (sync) processNextTick(cb, er);else cb(er);
  250. stream._writableState.errorEmitted = true;
  251. stream.emit('error', er);
  252. }
  253. function onwriteStateUpdate(state) {
  254. state.writing = false;
  255. state.writecb = null;
  256. state.length -= state.writelen;
  257. state.writelen = 0;
  258. }
  259. function onwrite(stream, er) {
  260. var state = stream._writableState;
  261. var sync = state.sync;
  262. var cb = state.writecb;
  263. onwriteStateUpdate(state);
  264. if (er) onwriteError(stream, state, sync, er, cb);else {
  265. // Check if we're actually ready to finish, but don't emit yet
  266. var finished = needFinish(state);
  267. if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  268. clearBuffer(stream, state);
  269. }
  270. if (sync) {
  271. /*<replacement>*/
  272. asyncWrite(afterWrite, stream, state, finished, cb);
  273. /*</replacement>*/
  274. } else {
  275. afterWrite(stream, state, finished, cb);
  276. }
  277. }
  278. }
  279. function afterWrite(stream, state, finished, cb) {
  280. if (!finished) onwriteDrain(stream, state);
  281. state.pendingcb--;
  282. cb();
  283. finishMaybe(stream, state);
  284. }
  285. // Must force callback to be called on nextTick, so that we don't
  286. // emit 'drain' before the write() consumer gets the 'false' return
  287. // value, and has a chance to attach a 'drain' listener.
  288. function onwriteDrain(stream, state) {
  289. if (state.length === 0 && state.needDrain) {
  290. state.needDrain = false;
  291. stream.emit('drain');
  292. }
  293. }
  294. // if there's something in the buffer waiting, then process it
  295. function clearBuffer(stream, state) {
  296. state.bufferProcessing = true;
  297. var entry = state.bufferedRequest;
  298. if (stream._writev && entry && entry.next) {
  299. // Fast case, write everything using _writev()
  300. var l = state.bufferedRequestCount;
  301. var buffer = new Array(l);
  302. var holder = state.corkedRequestsFree;
  303. holder.entry = entry;
  304. var count = 0;
  305. while (entry) {
  306. buffer[count] = entry;
  307. entry = entry.next;
  308. count += 1;
  309. }
  310. doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  311. // doWrite is always async, defer these to save a bit of time
  312. // as the hot path ends with doWrite
  313. state.pendingcb++;
  314. state.lastBufferedRequest = null;
  315. state.corkedRequestsFree = holder.next;
  316. holder.next = null;
  317. } else {
  318. // Slow case, write chunks one-by-one
  319. while (entry) {
  320. var chunk = entry.chunk;
  321. var encoding = entry.encoding;
  322. var cb = entry.callback;
  323. var len = state.objectMode ? 1 : chunk.length;
  324. doWrite(stream, state, false, len, chunk, encoding, cb);
  325. entry = entry.next;
  326. // if we didn't call the onwrite immediately, then
  327. // it means that we need to wait until it does.
  328. // also, that means that the chunk and cb are currently
  329. // being processed, so move the buffer counter past them.
  330. if (state.writing) {
  331. break;
  332. }
  333. }
  334. if (entry === null) state.lastBufferedRequest = null;
  335. }
  336. state.bufferedRequestCount = 0;
  337. state.bufferedRequest = entry;
  338. state.bufferProcessing = false;
  339. }
  340. Writable.prototype._write = function (chunk, encoding, cb) {
  341. cb(new Error('not implemented'));
  342. };
  343. Writable.prototype._writev = null;
  344. Writable.prototype.end = function (chunk, encoding, cb) {
  345. var state = this._writableState;
  346. if (typeof chunk === 'function') {
  347. cb = chunk;
  348. chunk = null;
  349. encoding = null;
  350. } else if (typeof encoding === 'function') {
  351. cb = encoding;
  352. encoding = null;
  353. }
  354. if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  355. // .end() fully uncorks
  356. if (state.corked) {
  357. state.corked = 1;
  358. this.uncork();
  359. }
  360. // ignore unnecessary end() calls.
  361. if (!state.ending && !state.finished) endWritable(this, state, cb);
  362. };
  363. function needFinish(state) {
  364. return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  365. }
  366. function prefinish(stream, state) {
  367. if (!state.prefinished) {
  368. state.prefinished = true;
  369. stream.emit('prefinish');
  370. }
  371. }
  372. function finishMaybe(stream, state) {
  373. var need = needFinish(state);
  374. if (need) {
  375. if (state.pendingcb === 0) {
  376. prefinish(stream, state);
  377. state.finished = true;
  378. stream.emit('finish');
  379. } else {
  380. prefinish(stream, state);
  381. }
  382. }
  383. return need;
  384. }
  385. function endWritable(stream, state, cb) {
  386. state.ending = true;
  387. finishMaybe(stream, state);
  388. if (cb) {
  389. if (state.finished) processNextTick(cb);else stream.once('finish', cb);
  390. }
  391. state.ended = true;
  392. stream.writable = false;
  393. }
  394. // It seems a linked list but it is not
  395. // there will be only 2 of these for each stream
  396. function CorkedRequest(state) {
  397. var _this = this;
  398. this.next = null;
  399. this.entry = null;
  400. this.finish = function (err) {
  401. var entry = _this.entry;
  402. _this.entry = null;
  403. while (entry) {
  404. var cb = entry.callback;
  405. state.pendingcb--;
  406. cb(err);
  407. entry = entry.next;
  408. }
  409. if (state.corkedRequestsFree) {
  410. state.corkedRequestsFree.next = _this;
  411. } else {
  412. state.corkedRequestsFree = _this;
  413. }
  414. };
  415. }