-
Notifications
You must be signed in to change notification settings - Fork 438
/
BatchJob.php
207 lines (191 loc) · 6.01 KB
/
BatchJob.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
<?php
/**
* Copyright 2017 Google Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Google\Cloud\Core\Batch;
use Google\Cloud\Core\SysvTrait;
/**
* Represent batch jobs.
*
* @experimental The experimental flag means that while we believe this method
* or class is ready for use, it may change before release in backwards-
* incompatible ways. Please use with caution, and test thoroughly when
* upgrading.
*/
class BatchJob implements JobInterface
{
const DEFAULT_BATCH_SIZE = 100;
const DEFAULT_CALL_PERIOD = 2.0;
const DEFAULT_WORKERS = 1;
use JobTrait;
use SysvTrait;
use InterruptTrait;
use HandleFailureTrait;
/**
* @var callable The batch job handler. This callable accepts an array
* of items and should return a boolean.
*/
private $func;
/**
* @var int The size of the batch.
*/
private $batchSize;
/**
* @var float The period in seconds from the last execution to force
* executing the job.
*/
private $callPeriod;
/**
* @param string $identifier Unique identifier of the job.
* @param callable $func Any Callable except for Closure. The callable
* should accept an array of items as the first argument.
* @param int $idNum A numeric id for the job.
* @param array $options [optional] {
* Configuration options.
*
* @type int $batchSize The size of the batch. **Defaults to** `100`.
* @type float $callPeriod The period in seconds from the last execution
* to force executing the job. **Defaults to** `2.0`.
* @type int $numWorkers The number of child processes. It only takes
* effect with the {@see \Google\Cloud\Core\Batch\BatchDaemon}.
* **Defaults to** `1`.
* @type string $bootstrapFile A file to load before executing the
* job. It's needed for registering global functions.
* }
*/
public function __construct(
$identifier,
$func,
$idNum,
array $options = []
) {
$options += [
'batchSize' => self::DEFAULT_BATCH_SIZE,
'callPeriod' => self::DEFAULT_CALL_PERIOD,
'bootstrapFile' => null,
'numWorkers' => self::DEFAULT_WORKERS
];
$this->identifier = $identifier;
$this->func = $func;
$this->id = $idNum;
$this->batchSize = $options['batchSize'];
$this->callPeriod = $options['callPeriod'];
$this->bootstrapFile = $options['bootstrapFile'];
$this->numWorkers = $options['numWorkers'];
$this->initFailureFile();
}
/**
* Run the job.
*/
public function run()
{
$this->setupSignalHandlers();
$sysvKey = $this->getSysvKey($this->id);
$q = msg_get_queue($sysvKey);
$items = [];
$lastInvoked = microtime(true);
if (!is_null($this->bootstrapFile)) {
require_once($this->bootstrapFile);
}
while (true) {
// Fire SIGALRM after 1 second to unblock the blocking call.
pcntl_alarm(1);
if (msg_receive(
$q,
0,
$type,
8192,
$message,
true,
0, // blocking mode
$errorcode
)) {
if ($type === self::$typeDirect) {
$items[] = $message;
} elseif ($type === self::$typeFile) {
$items[] = unserialize(file_get_contents($message));
@unlink($message);
}
}
pcntl_signal_dispatch();
// It runs the job when
// 1. Number of items reaches the batchSize.
// 2-a. Count is >0 and the current time is larger than lastInvoked + period.
// 2-b. Count is >0 and the shutdown flag is true.
if ((count($items) >= $this->batchSize)
|| (count($items) > 0
&& (microtime(true) > $lastInvoked + $this->callPeriod
|| $this->shutdown))) {
printf(
'Running the job with %d items' . PHP_EOL,
count($items)
);
$this->flush($items);
$items = [];
$lastInvoked = microtime(true);
}
gc_collect_cycles();
if ($this->shutdown) {
return;
}
}
}
/**
* Finish any pending activity for this job.
*
* @param array $items
* @return bool
*/
public function flush(array $items = [])
{
if (! $this->callFunc($items)) {
$this->handleFailure($this->id, $items);
return false;
}
return true;
}
/**
* Finish any pending activity for this job.
*
* @access private
* @internal
*
* @param array $items
* @return bool
*/
public function callFunc(array $items = [])
{
return call_user_func_array($this->func, [$items]);
}
/**
* Returns the period in seconds from the last execution to force
* executing the job.
*
* @return float
*/
public function getCallPeriod()
{
return $this->callPeriod;
}
/**
* Returns the batch size.
*
* @return int
*/
public function getBatchSize()
{
return $this->batchSize;
}
}