diff --git a/campaignion_m2t_send/campaignion_m2t_send.info b/campaignion_m2t_send/campaignion_m2t_send.info new file mode 100644 index 00000000..c07e4c15 --- /dev/null +++ b/campaignion_m2t_send/campaignion_m2t_send.info @@ -0,0 +1,11 @@ +name = Match to target send +description = Send M2T messages asynchronously +core = 7.x +package = Campaignion + +dependencies[] = psr0:psr0 +dependencies[] = campaignion:campaignion_email_to_target +dependencies[] = little_helpers:little_helpers +dependencies[] = ultimate_cron:ultimate_cron +dependencies[] = variable:variable +dependencien[] = webform:webform diff --git a/campaignion_m2t_send/campaignion_m2t_send.install b/campaignion_m2t_send/campaignion_m2t_send.install new file mode 100644 index 00000000..a4966c85 --- /dev/null +++ b/campaignion_m2t_send/campaignion_m2t_send.install @@ -0,0 +1,105 @@ + $types]); + $nids = db_select('campaignion_m2t_send', 'm') + ->fields('m', ['nid']) + ->groupBy('nid') + ->execute() + ->fetchCol(); + $nodes = entity_load('node', $nids); + + foreach (Submission::iterate($nodes) as $submission) { + foreach (array_keys($submission->webform->componentsByType('e2t_selector')) as $cid) { + foreach ($submission->valuesByCid($cid) as $no => $data) { + $m = unserialize($data); + if ($m['sent'] ?? NULL) { + db_update('campaignion_m2t_send') + ->condition('nid', $submission->nid) + ->condition('sid', $submission->sid) + ->condition('cid', $cid) + ->condition('no', $no) + ->fields(['sent_at' => 0, 'target_email' => $m['target']['email']]) + ->execute(); + } + } + } + } +} + +/** + * Implements hook_schema(). + */ +function campaignion_m2t_send_schema() { + $tables['campaignion_m2t_send'] = [ + 'description' => 'Stores the send status of email to target emails.', + 'fields' => [ + 'nid' => [ + 'description' => 'The node identifier of a webform.', + 'type' => 'int', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'default' => 0, + ], + 'sid' => [ + 'description' => 'The unique identifier for this submission.', + 'type' => 'int', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'default' => 0, + ], + 'cid' => [ + 'description' => 'The identifier for this component within this node, starts at 0 for each node.', + 'type' => 'int', + 'size' => 'small', + 'unsigned' => TRUE, + 'not null' => TRUE, + 'default' => 0, + ], + 'no' => [ + 'description' => 'Usually this value is 0, but if a field has multiple values (such as a time or date), it may require multiple rows in the database.', + 'type' => 'varchar', + 'length' => 128, + 'not null' => TRUE, + 'default' => '0', + ], + 'target_email' => [ + 'description' => 'Email address of the target the email was sent to.', + 'type' => 'varchar', + 'length' => 256, + 'not null' => FALSE, + ], + 'sent_at' => [ + 'desrciption' => 'The timestamp for when the email was sent (if)', + 'type' => 'int', + 'not null' => FALSE, + ], + ], + 'primary key' => ['nid', 'sid', 'cid', 'no'], + 'indexes' => [ + 'node' => ['nid'], + 'submission' => ['sid'], + 'component' => ['nid', 'sid', 'cid'], + 'unsent' => ['nid', 'sent_at'], + 'target' => ['target_email'], + ], + ]; + return $tables; +} diff --git a/campaignion_m2t_send/campaignion_m2t_send.module b/campaignion_m2t_send/campaignion_m2t_send.module new file mode 100644 index 00000000..800d27b9 --- /dev/null +++ b/campaignion_m2t_send/campaignion_m2t_send.module @@ -0,0 +1,129 @@ + SendMessagesCron::class, + 'arguments' => [ + '!campaignion_m2t_send_enabled_nodes', + '!campaignion_m2t_send_cron_time_limit', + ], + ]; + return $info; +} + +/** + * Implements hook_cronapi(). + */ +function campaignion_m2t_send_cronapi($op, $job = NULL) { + $items['campaignion_m2t_send'] = [ + 'description' => 'Send M2T messages of selected nodes', + 'rule' => '*+@ 8-23 * * *', + 'weight' => 100, + 'callback' => '_campaignion_m2t_send_run', + 'arguments' => ['campaignion_m2t_send.SendMessagesCron'], + ]; + return $items; +} + +/** + * Helper function to load a cron service and invoke it. + */ +function _campaignion_m2t_send_run($service) { + Container::get()->loadService($service)->run(); +} + +/** + * Helper function to get all M2T content types. + * + * @return str[] Array of node type machine names. + */ +function _campaignion_m2t_send_content_types() { + $types = &drupal_static(__FUNCTION__); + if ($types === NULL) { + $info = array_filter(module_invoke_all('campaignion_action_info'), function ($i) { + return ($i['channel'] ?? NULL) == EmailNoSend::class; + }); + $types = array_keys($info); + } + return $types; +} + +/** + * Implements hook_webform_submission_insert(). + */ +function campaignion_m2t_send_webform_submission_insert($node, $submission) { + if (!in_array($node->type, _campaignion_m2t_send_content_types())) { + return; + } + $s = new Submission($node, $submission); + foreach ($s->webform->componentsByType('e2t_selector') as $component) { + $values = $s->valuesByCid($component['cid']); + db_delete('campaignion_m2t_send') + ->condition('nid', $node->nid) + ->condition('sid', $submission->sid) + ->condition('cid', $component['cid']) + ->condition('no', array_keys($values), 'NOT IN') + ->execute(); + foreach ($values as $no => $value) { + db_merge('campaignion_m2t_send')->key([ + 'nid' => $node->nid, + 'sid' => $submission->sid, + 'cid' => $component['cid'], + 'no' => $no, + ])->execute(); + } + } +} + +/** + * Implements hook_webform_submission_load(). + */ +function campaignion_m2t_send_webform_submission_load(&$submissions) { + $data_sql = << array_keys($submissions)]) as $d) { + $submissions[$d->sid]->m2t_unsent_messages[] = $d; + } +} + +/** + * Implements hook_webform_submission_update(). + */ +function campaignion_m2t_send_webform_submission_update($node, $submission) { + campaignion_m2t_send_webform_submission_insert($node, $submission); +} + +/** + * Implements hook_webform_submission_delete(). + */ +function campaignion_m2t_send_webform_submission_delete($node, $submission) { + db_delete('campaignion_m2t_send') + ->condition('sid', $submission->sid) + ->execute(); +} + +/** + * Implements hook_node_delete(). + */ +function campaignion_m2t_send_node_delete($node) { + db_delete('campaignion_m2t_send') + ->condition('nid', $node->nid) + ->execute(); +} diff --git a/campaignion_m2t_send/campaignion_m2t_send.variable.inc b/campaignion_m2t_send/campaignion_m2t_send.variable.inc new file mode 100644 index 00000000..a051e22d --- /dev/null +++ b/campaignion_m2t_send/campaignion_m2t_send.variable.inc @@ -0,0 +1,26 @@ + t('Time limit for M2T send cron-jobs'), + 'description' => t('When a m2t send cron-job has been running for more than this amount of seconds no new batch will be started during this cron-run.'), + 'type' => 'number', + 'default' => 20, + 'localize' => FALSE, + ]; + $v['campaignion_m2t_send_enabled_nodes'] = [ + 'title' => t('Enabled nodes'), + 'description' => t('Only send emails for these nodes.'), + 'type' => 'unknown', + 'default' => [], + ]; + return $v; +} diff --git a/campaignion_m2t_send/src/SendMessagesCron.php b/campaignion_m2t_send/src/SendMessagesCron.php new file mode 100644 index 00000000..88641239 --- /dev/null +++ b/campaignion_m2t_send/src/SendMessagesCron.php @@ -0,0 +1,174 @@ + 0, + 'withheld' => 0, + ]; + + /** + * Cron time limit in seconds. + * + * @var int + */ + protected $timeLimit; + + /** + * Create a new cron-job instance based on config. + */ + public function __construct(array $enabled_nodes, int $time_limit) { + $this->enabledNodes = $enabled_nodes; + $this->timeLimit = $time_limit; + } + + /** + * Fetch targets from the e2t service. + * + * @return array + * Targets with email address grouped by party name. + */ + protected function getCurrentTargets(Submission $submission, string $dataset) { + /** @var Drupal\campaignion_email_to_target\Api\Client */ + $client = Container::get()->loadService('campaignion_email_to_target.api.Client'); + return array_filter($client->getTargets($dataset, ['postcode' => $submission->valueByKey('postcode')]), function ($target) { + return (bool) ($target['email'] ?? NULL); + }); + } + + /** + * Replace the target in a message with a new target. + * + * @return array + * The message with the target replaced. + */ + protected function replaceTarget($m, $target) { + $m['target'] = $target; + $m['sent'] = TRUE; + $m['message']['toAddress'] = $target['email']; + $m['message']['toName'] = trim("{$target['title']} {$target['first_name']} {$target['last_name']}"); + $m['message']['header'] = preg_replace('/Dear .*,/', $m['message']['header'], "Dear {$target['salutation']},"); + return $m; + } + + /** + * Decide whether to send an additional email to the target or not. + * + * This should be somewhat random as to look natural. It also keeps the + * order of messages. + */ + protected function rateLimit($target) { + $email = $target['email']; + if (!($this->stopSending[$email] ?? FALSE)) { + if (rand(0, 1) === 1) { // 50:50 chance. + $this->messageStats['sent'] += 1; + return TRUE; + } + else { + $this->stopSending[$email] = TRUE; + } + } + $this->messageStats['withheld'] += 1; + return FALSE; + } + + /** + * Send the target emails for a submission using new data from the e2t-api. + */ + protected function processSubmission(Submission $submission, Action $action) { + $channel = new Email(); + $targets = $this->getCurrentTargets($submission, $action->getOptions()['dataset_name']); + $data = array_filter(array_map(function ($d) use ($targets) { + $m = unserialize($d->data); + if (($new_target = $targets[0] ?? NULL) && $this->rateLimit($new_target)) { + $d->new_data = serialize($this->replaceTarget($m, $new_target)); + $d->new_target = $new_target; + return $d; + } + return NULL; + }, $submission->m2t_unsent_messages)); + foreach ($data as $d) { + $component_o = Component::fromComponent($submission->webform->component($d->cid)); + $component_o->sendEmails([$d->new_data], $submission, $channel); + db_update('webform_submitted_data') + ->fields(['data' => $d->new_data]) + ->condition('nid', $submission->nid) + ->condition('sid', $submission->sid) + ->condition('cid', $d->cid) + ->condition('no', $d->no) + ->execute(); + db_merge('campaignion_m2t_send') + ->key(['nid' => $submission->nid, 'sid' => $submission->sid, 'cid' => $d->cid, 'no' => $d->no]) + ->fields(['sent_at' => time(), 'target_email' => $d->new_target['email']]) + ->execute(); + } + } + + /** + * Main function of the cron-job. + */ + public function run() { + module_load_include('inc', 'webform', 'includes/webform.submissions'); + $time_limit = REQUEST_TIME + $this->timeLimit; + $nids = array_keys(array_filter($this->enabledNodes)); + $nodes = entity_load('node', $nids); + $actions = array_map(function ($node) { + return ActionLoader::instance()->actionFromNode($node); + }, $nodes); + + $submission_sql = <<:last_sid + ORDER BY s.sid + LIMIT 100 + SQL; + + $count_processed = 0; + foreach (Submission::iterate($nodes, $submission_sql) as $submission) { + $this->processSubmission($submission, $actions[$submission->nid]); + $count_processed += 1; + if (time() > $time_limit) { + break; + } + } + $vars = [ + '@submissions' => $count_processed, + '@sent' => $this->messageStats['sent'], + '@withheld' => $this->messageStats['withheld'] + ]; + watchdog('campaignion_m2t_send', '@submissions submissions processed, @sent emails sent (@withheld emails not sent).', $vars, WATCHDOG_INFO); + } + +} diff --git a/campaignion_m2t_send/src/Submission.php b/campaignion_m2t_send/src/Submission.php new file mode 100644 index 00000000..232fbb63 --- /dev/null +++ b/campaignion_m2t_send/src/Submission.php @@ -0,0 +1,44 @@ +:last_sid + ORDER BY sid + LIMIT 100; + SQL; + + foreach ($nodes as $node) { + $args = [':last_sid' => 0, ':nid' => $node->nid]; + while ($sids = db_query($submission_sql, $args)->fetchCol()) { + foreach (webform_get_submissions(['ws.sid' => $sids]) as $s) { + yield new static($nodes[$s->nid], $s); + $args[':last_sid'] = $s->sid; + } + gc_collect_cycles(); + } + } + } + +}