Initial commit
This commit is contained in:
90
node_modules/execa/lib/convert/writable.js
generated
vendored
Normal file
90
node_modules/execa/lib/convert/writable.js
generated
vendored
Normal file
@@ -0,0 +1,90 @@
|
||||
import {Writable} from 'node:stream';
|
||||
import {callbackify} from 'node:util';
|
||||
import {getToStream} from '../arguments/fd-options.js';
|
||||
import {addConcurrentStream, waitForConcurrentStreams} from './concurrent.js';
|
||||
import {
|
||||
safeWaitForSubprocessStdout,
|
||||
waitForSubprocessStdin,
|
||||
waitForSubprocess,
|
||||
destroyOtherStream,
|
||||
} from './shared.js';
|
||||
|
||||
// Create a `Writable` stream that forwards to `stdin` and awaits the subprocess
|
||||
export const createWritable = ({subprocess, concurrentStreams}, {to} = {}) => {
|
||||
const {subprocessStdin, waitWritableFinal, waitWritableDestroy} = getSubprocessStdin(subprocess, to, concurrentStreams);
|
||||
const writable = new Writable({
|
||||
...getWritableMethods(subprocessStdin, subprocess, waitWritableFinal),
|
||||
destroy: callbackify(onWritableDestroy.bind(undefined, {
|
||||
subprocessStdin,
|
||||
subprocess,
|
||||
waitWritableFinal,
|
||||
waitWritableDestroy,
|
||||
})),
|
||||
highWaterMark: subprocessStdin.writableHighWaterMark,
|
||||
objectMode: subprocessStdin.writableObjectMode,
|
||||
});
|
||||
onStdinFinished(subprocessStdin, writable);
|
||||
return writable;
|
||||
};
|
||||
|
||||
// Retrieve `stdin` (or other stream depending on `to`)
|
||||
export const getSubprocessStdin = (subprocess, to, concurrentStreams) => {
|
||||
const subprocessStdin = getToStream(subprocess, to);
|
||||
const waitWritableFinal = addConcurrentStream(concurrentStreams, subprocessStdin, 'writableFinal');
|
||||
const waitWritableDestroy = addConcurrentStream(concurrentStreams, subprocessStdin, 'writableDestroy');
|
||||
return {subprocessStdin, waitWritableFinal, waitWritableDestroy};
|
||||
};
|
||||
|
||||
export const getWritableMethods = (subprocessStdin, subprocess, waitWritableFinal) => ({
|
||||
write: onWrite.bind(undefined, subprocessStdin),
|
||||
final: callbackify(onWritableFinal.bind(undefined, subprocessStdin, subprocess, waitWritableFinal)),
|
||||
});
|
||||
|
||||
// Forwards data from `writable` to `stdin`
|
||||
const onWrite = (subprocessStdin, chunk, encoding, done) => {
|
||||
if (subprocessStdin.write(chunk, encoding)) {
|
||||
done();
|
||||
} else {
|
||||
subprocessStdin.once('drain', done);
|
||||
}
|
||||
};
|
||||
|
||||
// Ensures that the writable `final` and readable `end` events awaits the subprocess.
|
||||
// Like this, any subprocess failure is propagated as a stream `error` event, instead of being lost.
|
||||
// The user does not need to `await` the subprocess anymore, but now needs to await the stream completion or error.
|
||||
// When multiple writables are targeting the same stream, they wait for each other, unless the subprocess ends first.
|
||||
const onWritableFinal = async (subprocessStdin, subprocess, waitWritableFinal) => {
|
||||
if (await waitForConcurrentStreams(waitWritableFinal, subprocess)) {
|
||||
if (subprocessStdin.writable) {
|
||||
subprocessStdin.end();
|
||||
}
|
||||
|
||||
await subprocess;
|
||||
}
|
||||
};
|
||||
|
||||
// When `subprocess.stdin` ends/aborts/errors, do the same on `writable`.
|
||||
export const onStdinFinished = async (subprocessStdin, writable, subprocessStdout) => {
|
||||
try {
|
||||
await waitForSubprocessStdin(subprocessStdin);
|
||||
if (writable.writable) {
|
||||
writable.end();
|
||||
}
|
||||
} catch (error) {
|
||||
await safeWaitForSubprocessStdout(subprocessStdout);
|
||||
destroyOtherWritable(writable, error);
|
||||
}
|
||||
};
|
||||
|
||||
// When `writable` aborts/errors, do the same on `subprocess.stdin`
|
||||
export const onWritableDestroy = async ({subprocessStdin, subprocess, waitWritableFinal, waitWritableDestroy}, error) => {
|
||||
await waitForConcurrentStreams(waitWritableFinal, subprocess);
|
||||
if (await waitForConcurrentStreams(waitWritableDestroy, subprocess)) {
|
||||
destroyOtherWritable(subprocessStdin, error);
|
||||
await waitForSubprocess(subprocess, error);
|
||||
}
|
||||
};
|
||||
|
||||
const destroyOtherWritable = (stream, error) => {
|
||||
destroyOtherStream(stream, stream.writable, error);
|
||||
};
|
||||
Reference in New Issue
Block a user