8000 New: Initial implementation based on vinyl-fs · gulpjs/lead@af7c2f6 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit af7c2f6

Browse files
committed
New: Initial implementation based on vinyl-fs
1 parent bd9992e commit af7c2f6

File tree

4 files changed

+250
-0
lines changed

4 files changed

+250
-0
lines changed

index.js

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
'use strict';
2+
3+
var Writable = require('flush-write-stream');
4+
5+
function listenerCount(stream, evt) {
6+
return stream.listeners(evt).length;
7+
}
8+
9+
function hasListeners(stream) {
10+
return !!(listenerCount(stream, 'readable') || listenerCount(stream, 'data'));
11+
}
12+
13+
function sinker(file, enc, callback) {
14+
callback();
15+
}
16+
17+
function sink(stream) {
18+
var sinkAdded = false;
19+
20+
var sinkOptions = {
21+
objectMode: stream._readableState.objectMode,
22+
};
23+
24+
var sinkStream = new Writable(sinkOptions, sinker);
25+
26+
function addSink() {
27+
if (sinkAdded) {
28+
return;
29+
}
30+
31+
if (hasListeners(stream)) {
32+
return;
33+
}
34+
35+
sinkAdded = true;
36+
stream.pipe(sinkStream);
37+
}
38+
39+
function removeSink(evt) {
40+
if (evt !== 'readable' && evt !== 'data') {
41+
return;
42+
}
43+
44+
if (hasListeners(stream)) {
45+
sinkAdded = false;
46+
stream.unpipe(sinkStream);
47+
}
48+
}
49+
50+
stream.on('newListener', removeSink);
51+
stream.on('removeListener', removeSink);
52+
stream.on('removeListener', addSink);
53+
54+
// Sink the stream to start flowing
55+
// Do this on nextTick, it will flow at slowest speed of piped streams
56+
process.nextTick(addSink);
57+
58+
return stream;
59+
}
60+
61+
module.exports = sink;

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"coveralls": "npm run cover && istanbul-coveralls"
2525
},
2626
"dependencies": {
27+
"flush-write-stream": "^1.0.2"
2728
},
2829
"devDependencies": {
2930
"eslint": "^1.10.3",

test/.eslintrc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"extends": "gulp/test"
3+
}

test/index.js

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
'use strict';
2+
3+
var expect = require('expect');
4+
var miss = require('mississippi');
5+
6+
var sink = require('../');
7+
8+
var to = miss.to;
9+
var from = miss.from;
10+
var pipe = miss.pipe;
11+
var through = miss.through;
12+
13+
function noop() {}
14+
15+
function count(value) {
16+
var count = 0;
17+
return through.obj(function(file, enc, cb) {
18+
count++;
19+
cb(null, file);
20+
}, function(cb) {
21+
expect(count).toEqual(value);
22+
cb();
23+
});
24+
}
25+
26+
function slowCount(value) {
27+
var count = 0;
28+
return to.obj(function(file, enc, cb) {
29+
count++;
30+
31+
setTimeout(function() {
32+
cb(null, file);
33+
}, 250);
34+
}, function(cb) {
35+
expect(count).toEqual(value);
36+
cb();
37+
});
38+
}
39+
40+
describe('lead', function() {
41+
42+
it('respects objectMode of wrapped stream', function(done) {
43+
var write = sink(through());
44+
45+
function assert(err) {
46+
// Forced an object through a non-object stream
47+
expect(err).toExist();
48+
done();
49+
}
50+
51+
pipe([
52+
from.obj([{}]),
53+
// Must be in the Writable position to test this
54+
// So concat-stream cannot be used
55+
write,
56+
], assert);
57+
});
58+
59+
it('does not get clogged by highWaterMark', function(done) {
60+
var expectedCount = 17;
61+
var highwatermarkObjs = [];
62+
for (var idx = 0; idx < expectedCount; idx++) {
63+
highwatermarkObjs.push({});
64+
}
65+
66+
var write = sink(through.obj());
67+
68+
pipe([
69+
from.obj(highwatermarkObjs),
70+
count(expectedCount),
71+
// Must be in the Writable position to test this
72+
// So concat-stream cannot be used
73+
write,
74+
], done);
75+
});
76+
77+
it('allows backpressure when piped to another, slower stream', function(done) {
78+
this.timeout(20000);
79+
80+
var expectedCount = 24;
81+
var highwatermarkObjs = [];
82+
for (var idx = 0; idx < expectedCount; idx++) {
83+
highwatermarkObjs.push({});
84+
}
85+
86+
var write = sink(through.obj());
87+
88+
pipe([
89+
from.obj(highwatermarkObjs),
90+
count(expectedCount),
91+
write,
92+
slowCount(expectedCount),
93+
], done);
94+
});
95+
96+
it('respects readable listeners on wrapped stream', function(done) {
97+
var write = sink(through.obj());
98+
99+
var readables = 0;
100+
write.on('readable', function() {
101+
var data = write.read();
102+
103+
if (data != null) {
104+
readables++;
105+
}
106+
});
107+
108+
function assert(err) {
109+
expect(readables).toEqual(1);
110+
done(err);
111+
}
112+
113+
pipe([
114+
from.obj([{}]),
115+
write,
116+
], assert);
117+
});
118+
119+
it('respects data listeners on wrapped stream', function(done) {
120+
var write = sink(through.obj());
121+
122+
var datas = 0;
123+
write.on('data', function() {
124+
datas++;
125+
});
126+
127+
function assert(err) {
128+
expect(datas).toEqual(1);
129+
done(err);
130+
}
131+
132+
pipe([
133+
from.obj([{}]),
134+
write,
135+
], assert);
136+
});
137+
138+
it('sinks the stream if all the readable event handlers are removed', function(done) {
139+
var expectedCount = 17;
140+
var highwatermarkObjs = [];
141+
for (var idx = 0; idx < expectedCount; idx++) {
142+
highwatermarkObjs.push({});
143+
}
144+
145+
var write = sink(through.obj());
146+
147+
write.on('readable', noop);
148+
149+
pipe([
150+
from.obj(highwatermarkObjs),
151+
count(expectedCount),
152+
// Must be in the Writable position to test this
153+
// So concat-stream cannot be used
154+
write,
155+
], done);
156+
157+
process.nextTick(function() {
158+
write.removeListener('readable', noop);
159+
});
160+
});
161+
162+
it('sinks the stream if all the data event handlers are removed', function(done) {
163+
var expectedCount = 17;
164+
var highwatermarkObjs = [];
165+
for (var idx = 0; idx < expectedCount; idx++) {
166+
highwatermarkObjs.push({});
167+
}
168+
169+
var write = sink(through.obj());
170+
171+
write.on('data', noop);
172+
173+
pipe([
174+
from.obj(highwatermarkObjs),
175+
count(expectedCount),
176+
// Must be in the Writable position to test this
177+
// So concat-stream cannot be used
178+
write,
179+
], done);
180+
181+
process.nextTick(function() {
182+
write.removeListener('data', noop);
183+
});
184+
});
185+
});

0 commit comments

Comments
 (0)
0