Implementing ProcessObservable
The application in this chapter is going to spawn subprocesses a lot so it makes sense to wrap this functionality into a self-sufficient Observable. This Observable will spawn a new subprocess, emit its output with onNext and also properly handle onError and onComplete notifications:
// ProcessObservable.php
class ProcessObservable extends Observable {
private $cmd;
private $pidFile;
public function __construct($cmd, $pidFile = null) {
$this->cmd = $cmd;
$this->pidFile = $pidFile;
}
public function subscribe($observer, $scheduler = null) {
$process = new Process($this->cmd);
$process->start();
$pid = $process->getPid();
if ($this->pidFile) {
file_put_contents($this->pidFile, $pid);
}
$disposable = new CompositeDisposable();
$autoObs = new AutoDetachObserver($observer...