Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor object usage #44

Merged
merged 2 commits into from
Apr 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 34 additions & 52 deletions consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,12 @@
#include "Zend/zend_exceptions.h"
#include "consumer_arginfo.h"

typedef struct _object_intern {
rd_kafka_t *rk;
kafka_conf_callbacks cbs;
zend_object std;
} object_intern;

static zend_class_entry * ce;
static zend_object_handlers handlers;

static void kafka_consumer_free(zend_object *object) /* {{{ */
{
object_intern *intern = php_kafka_from_obj(object_intern, object);
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
rd_kafka_resp_err_t err;
kafka_conf_callbacks_dtor(&intern->cbs);

Expand All @@ -75,9 +69,9 @@ static void kafka_consumer_free(zend_object *object) /* {{{ */
static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
{
zend_object* retval;
object_intern *intern;
kafka_object *intern;

intern = ecalloc(1, sizeof(object_intern)+ zend_object_properties_size(class_type));
intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type));
zend_object_std_init(&intern->std, class_type);
object_properties_init(&intern->std, class_type);

Expand All @@ -88,18 +82,6 @@ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
}
/* }}} */

static object_intern * get_object(zval *zconsumer) /* {{{ */
{
object_intern *oconsumer = Z_KAFKA_P(object_intern, zconsumer);

if (!oconsumer->rk) {
zend_throw_exception_ex(NULL, 0, "SimpleKafkaClient\\Consumer::__construct() has not been called");
return NULL;
}

return oconsumer;
} /* }}} */

static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */

size_t len;
Expand All @@ -125,15 +107,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct)
zval *zconf;
char errstr[512];
rd_kafka_t *rk;
object_intern *intern;
kafka_object *intern;
kafka_conf_object *conf_intern;
rd_kafka_conf_t *conf = NULL;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_OBJECT_OF_CLASS(zconf, ce_kafka_conf)
ZEND_PARSE_PARAMETERS_END();

intern = Z_KAFKA_P(object_intern, getThis());
intern = Z_KAFKA_P(kafka_object, getThis());

conf_intern = get_kafka_conf_object(zconf);
if (conf_intern) {
Expand Down Expand Up @@ -175,7 +157,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
HashTable *htopars = NULL;
rd_kafka_topic_partition_list_t *topics;
rd_kafka_resp_err_t err;
object_intern *intern;
kafka_object *intern;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "|h!", &htopars) == FAILURE) {
return;
Expand All @@ -186,7 +168,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
Z_PARAM_ARRAY_HT(htopars)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -219,12 +201,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment)
{
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *topics;
object_intern *intern;
kafka_object *intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -247,7 +229,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
{
HashTable *htopics;
HashPosition pos;
object_intern *intern;
kafka_object *intern;
rd_kafka_topic_partition_list_t *topics;
rd_kafka_resp_err_t err;
zval *zv;
Expand All @@ -256,7 +238,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
Z_PARAM_ARRAY_HT(htopics)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -287,13 +269,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
{
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *topics;
object_intern *intern;
kafka_object *intern;
int i;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -319,13 +301,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
Unsubscribe from the current subscription set */
ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
{
object_intern *intern;
kafka_object *intern;
rd_kafka_resp_err_t err;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -343,15 +325,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
Consume message or get error event, triggers callbacks */
ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
{
object_intern *intern;
kafka_object *intern;
zend_long timeout_ms;
rd_kafka_message_t *rkmessage, rkmessage_tmp = {0};

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_LONG(timeout_ms)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -374,7 +356,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
{
zval *zarg = NULL;
object_intern *intern;
kafka_object *intern;
rd_kafka_topic_partition_list_t *offsets = NULL;
rd_kafka_resp_err_t err;

Expand All @@ -383,7 +365,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
Z_PARAM_ZVAL(zarg)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -476,12 +458,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync)
Close connection */
ZEND_METHOD(SimpleKafkaClient_Consumer, close)
{
object_intern *intern;
kafka_object *intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -499,7 +481,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
zval *only_zrkt = NULL;
zend_long timeout_ms;
rd_kafka_resp_err_t err;
object_intern *intern;
kafka_object *intern;
const rd_kafka_metadata_t *metadata;
kafka_topic_object *only_orkt = NULL;

Expand All @@ -510,7 +492,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -540,14 +522,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle)
char *topic;
size_t topic_len;
rd_kafka_topic_t *rkt;
object_intern *intern;
kafka_object *intern;
kafka_topic_object *topic_intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_STRING(topic, topic_len)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -577,7 +559,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
{
HashTable *htopars = NULL;
zend_long timeout_ms;
object_intern *intern;
kafka_object *intern;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *topics;

Expand All @@ -586,7 +568,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
Z_PARAM_LONG(timeout_ms)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand Down Expand Up @@ -615,15 +597,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
{
HashTable *htopars = NULL;
object_intern *intern;
kafka_object *intern;
rd_kafka_resp_err_t err;
rd_kafka_topic_partition_list_t *topics;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_ARRAY_HT(htopars)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -650,7 +632,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
{
HashTable *htopars = NULL;
object_intern *intern;
kafka_object *intern;
rd_kafka_topic_partition_list_t *topicPartitions;
zend_long timeout_ms;
rd_kafka_resp_err_t err;
Expand All @@ -660,7 +642,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
Z_PARAM_LONG(timeout_ms)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -686,7 +668,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
{
object_intern *intern;
kafka_object *intern;
char *topic;
size_t topic_length;
long low, high;
Expand All @@ -705,7 +687,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
ZVAL_DEREF(lowResult);
ZVAL_DEREF(highResult);

intern = get_object(getThis());
intern = get_kafka_object(getThis());
if (!intern) {
return;
}
Expand All @@ -732,5 +714,5 @@ void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */

handlers = kafka_default_object_handlers;
handlers.free_obj = kafka_consumer_free;
handlers.offset = XtOffsetOf(object_intern, std);
handlers.offset = XtOffsetOf(kafka_object, std);
}
2 changes: 1 addition & 1 deletion tests/produce_consume_transactional.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ $topicName = sprintf("test_kafka_%s", uniqid());

$topic = $producer->getTopicHandle($topicName);

if (!$producer->getMetadata(false, 2*1000, $topic)) {
if (!$producer->getMetadata(false, 5*1000, $topic)) {
echo "Failed to get metadata, is broker down?\n";
}

Expand Down