Skip to content

Commit

Permalink
feat: rework promise interface
Browse files Browse the repository at this point in the history
Reworks the promise `.then` to be greatly simplified.

As part of this, error handling has also been improved in a few ways:

- Timeouts are now handled by an `AbortSignal` rather than the built-in
  `timeout` option, as there is currently no way to know if a proc was
killed due to a timeout.
- The last error is stored until we `await` the result, at which point,
  we will throw the stored error
- We wait for the process to close before trying to read the output, in
  case it threw
  • Loading branch information
43081j committed Jun 16, 2024
1 parent ca8ba53 commit c5d9d6b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 22 deletions.
72 changes: 56 additions & 16 deletions src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@ function normaliseCommandAndArgs(
};
}

function combineSignals(signals: Iterable<AbortSignal>): AbortSignal {
const controller = new AbortController();

for (const signal of signals) {
if (signal.aborted) {
controller.abort();
return signal;
}

const onAbort = (): void => {
controller.abort(signal.reason);
};
signal.addEventListener('abort', onAbort, {
signal: controller.signal
});
}

return controller.signal;
}

export class ExecProcess implements Result {
protected _process?: ChildProcess;
protected _aborted: boolean = false;
Expand All @@ -78,6 +98,7 @@ export class ExecProcess implements Result {
protected _args: string[];
protected _resolveClose?: () => void;
protected _processClosed: Promise<void>;
protected _thrownError?: Error;

public get process(): ChildProcess | undefined {
return this._process;
Expand Down Expand Up @@ -139,6 +160,10 @@ export class ExecProcess implements Result {
return;
}

if (this._thrownError) {
throw this._thrownError;
}

const sources: Readable[] = [];
if (proc.stderr) {
sources.push(proc.stderr);
Expand All @@ -158,10 +183,7 @@ export class ExecProcess implements Result {
await this._processClosed;
}

public async then<TResult1 = Output, TResult2 = never>(
onfulfilled?: ((value: Output) => TResult1 | PromiseLike<TResult1>) | null,
_onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null
): Promise<TResult1 | TResult2> {
protected async _waitForOutput(): Promise<Output> {
if (this._options?.stdin) {
await this._options.stdin;
}
Expand All @@ -172,22 +194,32 @@ export class ExecProcess implements Result {
throw new Error('No process was started');
}

await this._processClosed;

proc.removeAllListeners();

if (this._thrownError) {
throw this._thrownError;
}

const [stderr, stdout] = await Promise.all([
proc.stderr && readStreamAsString(proc.stderr),
proc.stdout && readStreamAsString(proc.stdout),
this._processClosed
proc.stdout && readStreamAsString(proc.stdout)
]);

const result: Output = {
stderr: stderr ?? '',
stdout: stdout ?? ''
};

if (onfulfilled) {
return onfulfilled(result);
} else {
return result as TResult1;
}
return result;
}

public then<TResult1 = Output, TResult2 = never>(
onfulfilled?: ((value: Output) => TResult1 | PromiseLike<TResult1>) | null,
onrejected?: ((reason: unknown) => TResult2 | PromiseLike<TResult2>) | null
): Promise<TResult1 | TResult2> {
return this._waitForOutput().then(onfulfilled, onrejected);
}

public spawn(): void {
Expand All @@ -198,18 +230,24 @@ export class ExecProcess implements Result {
...options.nodeOptions
};

const signals: AbortSignal[] = [];

if (options.timeout !== undefined) {
nodeOptions.timeout = options.timeout;
signals.push(AbortSignal.timeout(options.timeout));
}

if (options.signal !== undefined) {
nodeOptions.signal = options.signal;
signals.push(options.signal);
}

if (options.persist === true) {
nodeOptions.detached = true;
}

if (signals.length > 0) {
nodeOptions.signal = combineSignals(signals);
}

nodeOptions.env = computeEnv(cwd, nodeOptions.env);

const {command: normalisedCommand, args: normalisedArgs} =
Expand All @@ -230,12 +268,14 @@ export class ExecProcess implements Result {
}

protected _onError = (err: Error): void => {
if (err.name === 'AbortError') {
if (
err.name === 'AbortError' &&
(!(err.cause instanceof Error) || err.cause.name !== 'TimeoutError')
) {
this._aborted = true;
return;
}
// TODO emit this somewhere
throw err;
this._thrownError = err;
};

protected _onClose = (): void => {
Expand Down
7 changes: 1 addition & 6 deletions src/test/stream_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ test('readStreamAsString', async (t) => {
this.destroy(streamError);
}
});
try {
await readStreamAsString(stream);
assert.fail('expected to throw');
} catch (err) {
assert.equal(err, streamError);
}
await assert.rejects(readStreamAsString(stream), streamError);
});

await t.test('resolves to concatenated data', async () => {
Expand Down

0 comments on commit c5d9d6b

Please sign in to comment.