Implementing ProcessObservable
The application in this chapter is going to spawn subprocesses a lot so it makes sense to wrap this functionality into a self-sufficient Observable. This Observable will spawn a new subprocess, emit its output with onNext and also properly handle onError and onComplete notifications:
// ProcessObservable.php 
class ProcessObservable extends Observable { 
  private $cmd; 
  private $pidFile; 
 
  public function __construct($cmd, $pidFile = null) { 
    $this->cmd = $cmd; 
    $this->pidFile = $pidFile; 
  } 
 
  public function subscribe($observer, $scheduler = null) { 
    $process = new Process($this->cmd); 
    $process->start(); 
 
    $pid = $process->getPid(); 
    if ($this->pidFile) { 
      file_put_contents($this->pidFile, $pid); 
    } 
 
    $disposable = new CompositeDisposable(); 
    $autoObs = new AutoDetachObserver($observer...