Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify free and new for producer and consumer #45

Merged
merged 6 commits into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 2 additions & 55 deletions consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,47 +40,7 @@
#include "Zend/zend_exceptions.h"
#include "consumer_arginfo.h"

static zend_class_entry * ce;
static zend_object_handlers handlers;

static void kafka_consumer_free(zend_object *object) /* {{{ */
{
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
rd_kafka_resp_err_t err;
kafka_conf_callbacks_dtor(&intern->cbs);

if (intern->rk) {
err = rd_kafka_consumer_close(intern->rk);

if (err) {
php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err));
}

rd_kafka_destroy(intern->rk);
intern->rk = NULL;
}

kafka_conf_callbacks_dtor(&intern->cbs);

zend_object_std_dtor(&intern->std);
}
/* }}} */

static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
{
zend_object* retval;
kafka_object *intern;

intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type));
zend_object_std_init(&intern->std, class_type);
object_properties_init(&intern->std, class_type);

retval = &intern->std;
retval->handlers = &handlers;

return retval;
}
/* }}} */
zend_class_entry * ce_kafka_consumer;

static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */

Expand Down Expand Up @@ -165,7 +125,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
Z_PARAM_OPTIONAL
Z_PARAM_ARRAY_HT(htopars)
Z_PARAM_ARRAY_HT_OR_NULL(htopars)
ZEND_PARSE_PARAMETERS_END();

intern = get_kafka_object(getThis());
Expand Down Expand Up @@ -703,16 +663,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
ZVAL_LONG(highResult, high);
}
/* }}} */

void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */
{
zend_class_entry tmpce;

INIT_NS_CLASS_ENTRY(tmpce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods);
ce = zend_register_internal_class(&tmpce);
ce->create_object = kafka_consumer_new;

handlers = kafka_default_object_handlers;
handlers.free_obj = kafka_consumer_free;
handlers.offset = XtOffsetOf(kafka_object, std);
}
2 changes: 1 addition & 1 deletion consumer.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ class Consumer
{
public function __construct(Configuration $configuration) {}

public function assign(?array $topics): void {}
public function assign(?array $topics = null): void {}

public function getAssignment(): array {}

Expand Down
6 changes: 3 additions & 3 deletions consumer_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: ba3bc0a741bc6eab7a23a15ca6d83c24e99b23de */
* Stub hash: 091c6b60081bb08ec174ef87b9cc6d2b3fbba461 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1)
ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 1, IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 1)
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 0, IS_VOID, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topics, IS_ARRAY, 1, "null")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getAssignment, 0, 0, IS_ARRAY, 0)
Expand Down
24 changes: 12 additions & 12 deletions kafka_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,28 +1,28 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: aac20095e4ad448dfdc0f3a25d87cbb17f9f1581 */
* Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */

ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, 0, 0, IS_LONG, 0)
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, 0, 1, IS_LONG, 0)
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0)
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, 0, 2, IS_ARRAY, 0)
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0)
ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0)
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
ZEND_END_ARG_INFO()
Expand All @@ -35,11 +35,11 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);


static const zend_function_entry class_SimpleKafkaClient_SimpleKafkaClient_methods[] = {
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, ZEND_ACC_PUBLIC)
static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
ZEND_FE_END
};
2 changes: 1 addition & 1 deletion metadata_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: da83d0319c899361606dfa0ccf0fd439aeeabfbb */
* Stub hash: cbb5ab5aee4d07e0673bef67dcc2d045303ebfbd */

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_getOrigBrokerId, 0, 0, IS_LONG, 0)
ZEND_END_ARG_INFO()
Expand Down
2 changes: 1 addition & 1 deletion metadata_partition_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 934cef11a377e54b4d5f8cea75e6d590ec071d50 */
* Stub hash: 207c49cb01d8b564c1419d2c24d332cc321420f5 */

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Partition_getId, 0, 0, IS_LONG, 0)
ZEND_END_ARG_INFO()
Expand Down
2 changes: 1 addition & 1 deletion metadata_topic_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 9d73f729b3dca2b6ac7fd5fdc39ba23d768ca792 */
* Stub hash: db8552307bc3c0d4d6035ff10c00b7e2a39a152a */

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Topic_getName, 0, 0, IS_STRING, 0)
ZEND_END_ARG_INFO()
Expand Down
2 changes: 1 addition & 1 deletion php_simple_kafka_client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
#endif

extern zend_class_entry * ce_kafka_conf;
extern zend_class_entry * ce_kafka_consumer;
extern zend_class_entry * ce_kafka_error_exception;
extern zend_class_entry * ce_kafka_exception;
extern zend_class_entry * ce_kafka_producer;
Expand Down Expand Up @@ -189,7 +190,6 @@ static inline char *kafka_hash_get_current_key_ex(HashTable *ht, HashPosition *p
void kafka_error_init();
void create_kafka_error(zval *return_value, const rd_kafka_error_t *error);
void kafka_conf_init(INIT_FUNC_ARGS);
void kafka_consumer_init(INIT_FUNC_ARGS);
void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs);
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from);
void kafka_message_init(INIT_FUNC_ARGS);
Expand Down
20 changes: 17 additions & 3 deletions simple_kafka_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "ext/standard/info.h"
#include "php_simple_kafka_client_int.h"
#include "Zend/zend_exceptions.h"
#include "consumer_arginfo.h"
#include "functions_arginfo.h"
#include "producer_arginfo.h"
#include "kafka_arginfo.h"
Expand All @@ -66,7 +67,17 @@ static void kafka_free(zend_object *object) /* {{{ */
kafka_object *intern = php_kafka_from_obj(kafka_object, object);

if (intern->rk) {
zend_hash_destroy(&intern->topics);
if (RD_KAFKA_CONSUMER == intern->type) {
rd_kafka_resp_err_t err;

err = rd_kafka_consumer_close(intern->rk);

if (err) {
php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err));
}
} else if (RD_KAFKA_PRODUCER == intern->type) {
zend_hash_destroy(&intern->topics);
}

rd_kafka_destroy(intern->rk);
intern->rk = NULL;
Expand Down Expand Up @@ -332,17 +343,20 @@ PHP_MINIT_FUNCTION(simple_kafka_client)
kafka_object_handlers.free_obj = kafka_free;
kafka_object_handlers.offset = XtOffsetOf(kafka_object, std);

INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_SimpleKafkaClient_methods);
INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_Kafka_methods);
ce_kafka = zend_register_internal_class(&ce);
ce_kafka->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS;
ce_kafka->create_object = kafka_new;

INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Producer", class_SimpleKafkaClient_Producer_methods);
ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka);

INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods);
ce_kafka_consumer = zend_register_internal_class(&ce);
ce_kafka_consumer->create_object = kafka_new;

kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU);
kafka_error_init();
kafka_consumer_init(INIT_FUNC_ARGS_PASSTHRU);
kafka_message_init(INIT_FUNC_ARGS_PASSTHRU);
kafka_metadata_init(INIT_FUNC_ARGS_PASSTHRU);
kafka_metadata_topic_partition_init(INIT_FUNC_ARGS_PASSTHRU);
Expand Down
17 changes: 5 additions & 12 deletions tests/conf_callbacks_integration.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ $conf->set('statistics.interval.ms', 10);
$conf->set('log_level', (string) LOG_DEBUG);
$conf->set('debug', 'all');

$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
$offsetCommitCount = 0;
$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) use (&$offsetCommitCount) {
++$offsetCommitCount;
});

$statsCbCalled = false;
Expand Down Expand Up @@ -102,22 +103,14 @@ while (true) {
$consumer->commit($msg);
}

var_dump($offsetCommitCount);
var_dump($statsCbCalled);
var_dump($logCbCalled);
var_dump($topicsAssigned);
var_dump($delivered);

--EXPECT--
Offset 1 committed.
Offset 2 committed.
Offset 3 committed.
Offset 4 committed.
Offset 5 committed.
Offset 6 committed.
Offset 7 committed.
Offset 8 committed.
Offset 9 committed.
Offset 10 committed.
int(10)
bool(true)
bool(true)
bool(true)
Expand Down
2 changes: 1 addition & 1 deletion topic_partition_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 72b2c9a25e8751ae022cc233f4b7a0e382be72f8 */
* Stub hash: 95f09c698079d00927dd2d02910325d6aff76157 */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition___construct, 0, 0, 2)
ZEND_ARG_TYPE_INFO(0, topicName, IS_STRING, 0)
Expand Down