Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ Creates an instance of `PythonShell` and starts the Python process
- `pythonOptions`: Array of option switches to pass to "python"
- `scriptPath`: The default path where to look for scripts. Default is the current working directory.
- `args`: Array of arguments to pass to the script
- `timeout`: Maximum execution time in milliseconds before the process is killed and the run rejects
- `stdoutSplitter`: splits stdout into chunks, defaulting to splitting into newline-seperated lines
- `stderrSplitter`: splits stderr into chunks, defaulting to splitting into newline-seperated lines

Expand Down
124 changes: 115 additions & 9 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,18 @@ export interface Options extends SpawnOptions {
* arguments to your program
*/
args?: string[];
/**
* maximum execution time in milliseconds before the process is killed
*/
timeout?: number;
}

export class PythonShellError extends Error {
traceback: string | Buffer;
exitCode?: number;
exitSignal?: string;
timeout?: number;
parserError?: Error;
}

export class PythonShellErrorWithLogs extends PythonShellError {
Expand Down Expand Up @@ -122,6 +129,11 @@ export class PythonShell extends EventEmitter {
exitCode: number;
private stderrHasEnded: boolean;
private stdoutHasEnded: boolean;
private timeoutId: NodeJS.Timeout;
private timeout: number;
private timedOut: boolean;
private timeoutParserError: Error;
private finished: boolean;
private _remaining: string;
private _endCallback: (
err: PythonShellError,
Expand Down Expand Up @@ -171,7 +183,17 @@ export class PythonShell extends EventEmitter {
let errorData = '';
EventEmitter.call(this);

function flushSplitter(stream: Readable, splitter: Transform) {
if (stream && splitter) {
stream.unpipe(splitter);
splitter.end();
}
}

options = <Options>extend({}, PythonShell.defaultOptions, options);
let timeout = options.timeout;
let spawnOptions = <Options>extend({}, options);
delete spawnOptions.timeout;
let pythonPath: string;
if (!options.pythonPath) {
pythonPath = PythonShell.defaultPythonPath;
Expand All @@ -187,7 +209,38 @@ export class PythonShell extends EventEmitter {
// We don't expect users to ever format stderr as JSON so we default to text mode
this.stderrParser = resolve('parse', options.stderrParser || 'text');
this.terminated = false;
this.childProcess = spawn(pythonPath, this.command, options);
this.childProcess = spawn(pythonPath, this.command, spawnOptions);

if (timeout > 0) {
this.timeout = timeout;
this.timeoutId = setTimeout(() => {
let killSignal = options.killSignal || 'SIGTERM';
self.timedOut = true;
if (self.exitCode == null && self.exitSignal == null) {
self.kill(killSignal);
if (killSignal !== 'SIGKILL') {
let forceKillTimer = setTimeout(() => {
if (self.exitCode == null && self.exitSignal == null) {
self.childProcess.kill('SIGKILL');
}
}, 100);
forceKillTimer.unref && forceKillTimer.unref();
}
}
try {
flushSplitter(self.stdout, stdoutSplitter);
flushSplitter(self.stderr, stderrSplitter);
} catch (err) {
self.timeoutParserError = err;
} finally {
self.stdoutHasEnded = true;
self.stderrHasEnded = true;
self.stdout && self.stdout.destroy();
self.stderr && self.stderr.destroy();
}
terminateIfNeeded();
}, timeout);
}

['stdout', 'stdin', 'stderr'].forEach(function (name) {
self[name] = self.childProcess[name];
Expand All @@ -205,7 +258,15 @@ export class PythonShell extends EventEmitter {
// note that setting the encoding turns the chunk into a string
stdoutSplitter.setEncoding(options.encoding || 'utf8');
this.stdout.pipe(stdoutSplitter).on('data', (chunk: string) => {
this.emit('message', self.parser(chunk));
try {
this.emit('message', self.parser(chunk));
} catch (err) {
if (self.timedOut) {
self.timeoutParserError = err;
} else {
throw err;
}
}
});
}

Expand All @@ -215,7 +276,15 @@ export class PythonShell extends EventEmitter {
// note that setting the encoding turns the chunk into a string
stderrSplitter.setEncoding(options.encoding || 'utf8');
this.stderr.pipe(stderrSplitter).on('data', (chunk: string) => {
this.emit('stderr', self.stderrParser(chunk));
try {
this.emit('stderr', self.stderrParser(chunk));
} catch (err) {
if (self.timedOut) {
self.timeoutParserError = err;
} else {
throw err;
}
}
});
}

Expand All @@ -241,6 +310,10 @@ export class PythonShell extends EventEmitter {
}

this.childProcess.on('error', function (err: NodeJS.ErrnoException) {
if (self.timeoutId) {
clearTimeout(self.timeoutId);
self.timeoutId = null;
}
self.emit('error', err);
});
this.childProcess.on('exit', function (code, signal) {
Expand All @@ -250,15 +323,47 @@ export class PythonShell extends EventEmitter {
});

function terminateIfNeeded() {
if (self.finished) {
return;
}

if (
!self.stderrHasEnded ||
!self.stdoutHasEnded ||
(self.exitCode == null && self.exitSignal == null)
(!self.timedOut &&
(!self.stderrHasEnded ||
!self.stdoutHasEnded ||
(self.exitCode == null && self.exitSignal == null))) ||
(self.timedOut && self.exitCode == null && self.exitSignal == null)
)
return;

self.finished = true;

if (self.timeoutId) {
clearTimeout(self.timeoutId);
self.timeoutId = null;
}

let err: PythonShellError;
if (self.exitCode && self.exitCode !== 0) {
if (self.timedOut) {
err = new PythonShellError(
'process timed out after ' + self.timeout + 'ms',
);
err = <PythonShellError>extend(err, {
executable: pythonPath,
options: pythonOptions.length ? pythonOptions : null,
script: self.scriptPath,
args: scriptArgs.length ? scriptArgs : null,
exitCode: self.exitCode,
exitSignal: self.exitSignal,
timeout: self.timeout,
});
if (self.timeoutParserError) {
err.parserError = self.timeoutParserError;
}
if (self.listeners('pythonError').length || !self._endCallback) {
self.emit('pythonError', err);
}
} else if (self.exitCode && self.exitCode !== 0) {
if (errorData) {
err = self.parseError(errorData);
} else {
Expand All @@ -272,6 +377,7 @@ export class PythonShell extends EventEmitter {
script: self.scriptPath,
args: scriptArgs.length ? scriptArgs : null,
exitCode: self.exitCode,
exitSignal: self.exitSignal,
});
// do not emit error if only a callback is used
if (self.listeners('pythonError').length || !self._endCallback) {
Expand Down Expand Up @@ -450,7 +556,7 @@ export class PythonShell extends EventEmitter {
* Sends a kill signal to the process
* @returns {PythonShell} The same instance for chaining calls
*/
kill(signal?: NodeJS.Signals) {
kill(signal?: NodeJS.Signals | number) {
this.terminated = this.childProcess.kill(signal);
return this;
}
Expand All @@ -459,7 +565,7 @@ export class PythonShell extends EventEmitter {
* Alias for kill.
* @deprecated
*/
terminate(signal?: NodeJS.Signals) {
terminate(signal?: NodeJS.Signals | number) {
// todo: remove this next breaking release
return this.kill(signal);
}
Expand Down
9 changes: 9 additions & 0 deletions test/python/exit_with_open_stdout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import subprocess
import sys


subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(1)"],
stdout=sys.stdout,
stderr=sys.stderr,
)
8 changes: 8 additions & 0 deletions test/python/ignore_sigterm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import signal
import time


signal.signal(signal.SIGTERM, lambda signum, frame: None)

while True:
time.sleep(0.05)
7 changes: 7 additions & 0 deletions test/python/print_invalid_json_then_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
import time


sys.stdout.write("{")
sys.stdout.flush()
time.sleep(1)
7 changes: 7 additions & 0 deletions test/python/print_partial_then_sleep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import sys
import time


sys.stdout.write("partial output")
sys.stdout.flush()
time.sleep(1)
103 changes: 103 additions & 0 deletions test/test-python-shell.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,17 @@ describe('PythonShell', function () {
done();
});
});
it('should clear timeout when python fails to spawn', function (done) {
let pyshell = new PythonShell('exit-code.py', {
pythonPath: 'foeisjofseij',
timeout: 10000,
});
pyshell.on('error', (err) => {
err.code.should.eql('ENOENT');
should((pyshell as any).timeoutId).be.null();
done();
});
});
it('should spawn a Python process with script arguments', function (done) {
let pyshell = new PythonShell('echo_args.py', {
args: ['hello', 'world'],
Expand Down Expand Up @@ -217,6 +228,98 @@ describe('PythonShell', function () {
done();
});
});
it('should reject when the process exceeds the timeout', function (done) {
this.timeout(3000);
PythonShell.run('infinite_loop.py', { timeout: 100 }).then(
() => {
done(new Error('expected timeout rejection'));
},
(err) => {
err.should.be.an.Error;
err.message.should.be.exactly('process timed out after 100ms');
err.timeout.should.be.exactly(100);
done();
},
);
});
it('should reject when timeout expires after the process exits but before stdio closes', function (done) {
this.timeout(3000);
PythonShell.run('exit_with_open_stdout.py', { timeout: 100 }).then(
() => {
done(new Error('expected timeout rejection'));
},
(err) => {
err.should.be.an.Error;
err.message.should.be.exactly('process timed out after 100ms');
err.exitCode.should.be.exactly(0);
err.timeout.should.be.exactly(100);
done();
},
);
});
it('should reject when the process ignores the timeout signal', function (done) {
this.timeout(3000);
PythonShell.run('ignore_sigterm.py', { timeout: 100 }).then(
() => {
done(new Error('expected timeout rejection'));
},
(err) => {
err.should.be.an.Error;
err.message.should.be.exactly('process timed out after 100ms');
err.timeout.should.be.exactly(100);
done();
},
);
});
it('should flush buffered output before rejecting on timeout', function (done) {
this.timeout(3000);
PythonShell.run('print_partial_then_sleep.py', { timeout: 100 }).then(
() => {
done(new Error('expected timeout rejection'));
},
(err) => {
err.should.be.an.Error;
err.logs.should.eql(['partial output']);
err.timeout.should.be.exactly(100);
done();
},
);
});
it('should reject on timeout when flushing buffered output throws', function (done) {
this.timeout(3000);
PythonShell.run('print_invalid_json_then_sleep.py', {
mode: 'json',
timeout: 100,
}).then(
() => {
done(new Error('expected timeout rejection'));
},
(err) => {
err.should.be.an.Error;
err.message.should.be.exactly('process timed out after 100ms');
err.parserError.should.be.an.Error;
err.timeout.should.be.exactly(100);
done();
},
);
});
it('should use killSignal when timeout kills the process', function (done) {
this.timeout(3000);
let pyshell = new PythonShell('infinite_loop.py', {
timeout: 100,
killSignal: 'SIGKILL',
});
let originalKill = pyshell.kill.bind(pyshell);
pyshell.kill = function (signal) {
signal.should.be.exactly('SIGKILL');
return originalKill(signal);
};
pyshell.end(function (err) {
err.should.be.an.Error;
err.timeout.should.be.exactly(100);
done();
});
});
it('should run multiple scripts and fail with an extended stack trace for each of them', function (done) {
let numberOfTimesToRun = 5;
for (let i = 0; i < numberOfTimesToRun; i++) {
Expand Down