index.js 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. var Readable = require('readable-stream/readable');
  2. var isReadable = require('is-stream').readable;
  3. var util = require('util');
  4. function addStream(streams, stream)
  5. {
  6. if(!isReadable(stream)) throw new Error('All input streams must be readable');
  7. var self = this;
  8. stream._buffer = [];
  9. stream.on('readable', function()
  10. {
  11. var chunk = stream.read();
  12. if (chunk === null)
  13. return;
  14. if(this === streams[0])
  15. self.push(chunk);
  16. else
  17. this._buffer.push(chunk);
  18. });
  19. stream.on('end', function()
  20. {
  21. for(var stream = streams[0];
  22. stream && stream._readableState.ended;
  23. stream = streams[0])
  24. {
  25. while(stream._buffer.length)
  26. self.push(stream._buffer.shift());
  27. streams.shift();
  28. }
  29. if(!streams.length) self.push(null);
  30. });
  31. stream.on('error', this.emit.bind(this, 'error'));
  32. streams.push(stream);
  33. }
  34. function OrderedStreams(streams, options) {
  35. if (!(this instanceof(OrderedStreams))) {
  36. return new OrderedStreams(streams, options);
  37. }
  38. streams = streams || [];
  39. options = options || {};
  40. options.objectMode = true;
  41. Readable.call(this, options);
  42. if(!Array.isArray(streams)) streams = [streams];
  43. if(!streams.length) return this.push(null); // no streams, close
  44. var addStream_bind = addStream.bind(this, []);
  45. streams.forEach(function(item)
  46. {
  47. if(Array.isArray(item))
  48. item.forEach(addStream_bind);
  49. else
  50. addStream_bind(item);
  51. });
  52. }
  53. util.inherits(OrderedStreams, Readable);
  54. OrderedStreams.prototype._read = function () {};
  55. module.exports = OrderedStreams;