Skip to content

Commit

Permalink
Add a "main loop" that processes each stage of the pipeline explicitly
Browse files Browse the repository at this point in the history
  • Loading branch information
adamziel committed Aug 27, 2024
1 parent 1234a10 commit bd19ad7
Showing 1 changed file with 91 additions and 12 deletions.
103 changes: 91 additions & 12 deletions pipes-controller.php
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,11 @@ function (ZipStreamReader $zip_reader, ByteStreamState $state) {
}
}

// ----------------------------------------------------------------------------
// Here's a stream-based pipeline that fetches a ZIP file from a remote server,
// unzips it, skips the first file, processes the XML files, and uppercases the
// output.
// ----------------------------------------------------------------------------
$chain = new StreamChain(
[
'http' => HTTP_Client::stream([
Expand All @@ -624,18 +629,92 @@ function (ZipStreamReader $zip_reader, ByteStreamState $state) {
// var_dump([$chain->next_chunk(), strlen($chain->get_bytes()), $chain->get_last_error()]);

// Or like this:
$chain->stop_on_errors(true);
foreach($chain as $chunk) {
switch($chunk->get_chunk_type()) {
case '#error':
echo "Error: " . $chunk->get_last_error() . "\n";
break;
case '#bytes':
var_dump([
$chunk->get_bytes(),
'zip file_id' => isset($chain['zip']) ? $chain['zip']->get_file_id() : null
]);
// $chain->stop_on_errors(true);
// foreach($chain as $chunk) {
// switch($chunk->get_chunk_type()) {
// case '#error':
// echo "Error: " . $chunk->get_last_error() . "\n";
// break;
// case '#bytes':
// var_dump([
// $chunk->get_bytes(),
// 'zip file_id' => isset($chain['zip']) ? $chain['zip']->get_file_id() : null
// ]);
// break;
// }
// }


// ----------------------------------------------------------
// And here's a loop-based pipeline that does the same thing:
// ----------------------------------------------------------

$client = new Client();
$client->enqueue([
new Request('http://127.0.0.1:9864/export.wxr.zip'),
new Request('http://127.0.0.1:9865')
]);

$zip_readers = [];
$xml_processors = [];
$xml_tokens_found = [];
while ($client->await_next_event()) {
// Fetch HTTP data
$request = $client->get_request();
switch ($client->get_event()) {
case Client::EVENT_BODY_CHUNK_AVAILABLE:
// Continue to the next stage
break;
case Client::EVENT_FAILED:
error_log('Request failed: ' . $request->error);
default:
continue 2;
}

// Unzip the file
$zip_reader = $zip_readers[$request->id] ?? new ZipStreamReader();
$zip_reader->append_bytes($client->get_response_body_chunk());
while ($zip_reader->next()) {
switch ($zip_reader->get_state()) {
case ZipStreamReader::STATE_FILE_ENTRY:
// Continue to the next stage
break;
default:
continue 2;
}

if($zip_reader->get_file_path() === 'export.wxr') {
continue;
}

// Process the XML
$xml_processor = $xml_processors[$request->id] ?? new WP_XML_Processor('', [], WP_XML_Processor::IN_PROLOG_CONTEXT);
$xml_processor->append_bytes($zip_reader->get_file_body_chunk());

$xml_tokens_found[$request->id] ??= 0;
while ($xml_processor->next_token()) {
++$xml_tokens_found[$request->id];
// Process the XML
}

$buffer = '';
if ($xml_tokens_found[$request->id] > 0) {
$buffer .= $xml_processor->get_updated_xml();
} else if (
$xml_tokens_found[$request->id] === 0 &&
!$xml_processor->is_paused_at_incomplete_input() &&
$xml_processor->get_current_depth() === 0
) {
// We've reached the end of the document, let's finish up.
// @TODO: Fix this so it doesn't return the entire XML
$buffer .= $xml_processor->get_unprocessed_xml();
}

if (!strlen($buffer)) {
continue;
}

// Uppercase the output
echo strtoupper($buffer);
}
}

0 comments on commit bd19ad7

Please sign in to comment.