Skip to content

Commit

Permalink
Optimize server code (#5478)
Browse files Browse the repository at this point in the history
* Optimize code

* Optimize code

* fix tests

* optimize code

* fix tests

* fix tests

* fix tests [3]

* fix tests [4]

* fix tests [5]
  • Loading branch information
matyhtf authored Sep 14, 2024
1 parent 81fd4e8 commit 7e30a33
Show file tree
Hide file tree
Showing 14 changed files with 325 additions and 250 deletions.
67 changes: 59 additions & 8 deletions core-tests/src/os/process_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,36 +11,73 @@ using namespace swoole;

static void test_func(ProcessPool &pool) {
EventData data{};
data.info.len = strlen(TEST_JPG_MD5SUM);
memcpy(data.data, TEST_JPG_MD5SUM, data.info.len);
size_t size = swoole_system_random(1024, 4096);
String rmem(size);
rmem.append_random_bytes(size - 1);
rmem.append("\0");

data.info.len = size;
memcpy(data.data, rmem.value(), size);

int worker_id = -1;
ASSERT_EQ(pool.dispatch_blocking(&data, &worker_id), SW_OK);

pool.running = true;
pool.ptr = &rmem;
SwooleWG.run_always = true;
pool.main_loop(&pool, pool.get_worker(0));
pool.destroy();
}

static void test_func_task_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_TASK);
pool.onTask = [](ProcessPool *pool, Worker *worker, EventData *task) -> int {
pool->running = false;
EXPECT_MEMEQ(task->data, TEST_JPG_MD5SUM, task->info.len);
String *_data = (String *) pool->ptr;
usleep(10000);
EXPECT_MEMEQ(_data->str, task->data, task->len());
return 0;
};
pool.main_loop(&pool, pool.get_worker(0));
pool.destroy();
test_func(pool);
}

static void test_func_message_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_MESSAGE);
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
pool->running = false;
String *_data = (String *) pool->ptr;
usleep(10000);
EXPECT_MEMEQ(_data->str, rdata->data, rdata->info.len);
};
test_func(pool);
}

static void test_func_stream_protocol(ProcessPool &pool) {
pool.set_protocol(SW_PROTOCOL_STREAM);
pool.onMessage = [](ProcessPool *pool, RecvData *rdata) {
pool->running = false;
String *_data = (String *) pool->ptr;
EventData *msg = (EventData *) rdata->data;
usleep(10000);
EXPECT_MEMEQ(_data->str, msg->data, msg->len());
};
test_func(pool);
}

TEST(process_pool, tcp) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_SOCKET), SW_OK);
ASSERT_EQ(pool.listen(TEST_HOST, TEST_PORT, 128), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, unix_sock) {
ProcessPool pool{};
signal(SIGPIPE, SIG_IGN);
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, tcp_raw) {
Expand Down Expand Up @@ -72,7 +109,21 @@ TEST(process_pool, msgqueue) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0x9501, SW_IPC_MSGQUEUE), SW_OK);

test_func(pool);
test_func_task_protocol(pool);
}

TEST(process_pool, message_protocol) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func_message_protocol(pool);
}

TEST(process_pool, stream_protocol) {
ProcessPool pool{};
ASSERT_EQ(pool.create(1, 0, SW_IPC_UNIXSOCK), SW_OK);

test_func_stream_protocol(pool);
}

constexpr int magic_number = 99900011;
Expand Down
36 changes: 18 additions & 18 deletions core-tests/src/server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,24 +545,25 @@ TEST(server, task_worker) {
exit(2);
}

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(serv->get_task_count(), 1);
serv.onTask = [](Server *serv, EventData *task) -> int {
EXPECT_EQ(serv->get_tasking_num(), 1);
EXPECT_EQ(string(task->data, task->info.len), string(packet));
serv->gs->task_workers.running = 0;
serv->gs->task_count++;
serv->gs->tasking_num--;
return 0;
};

ASSERT_EQ(serv.create(), SW_OK);
ASSERT_EQ(serv.create_task_workers(), SW_OK);

thread t1([&serv]() {
SwooleWG.run_always = true;
serv.gs->task_workers.running = 1;
serv.gs->tasking_num++;
serv.gs->task_workers.main_loop(&serv.gs->task_workers, &serv.gs->task_workers.workers[0]);
EXPECT_EQ(serv.get_tasking_num(), 0);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_task_count(), 0);
serv.gs->tasking_num--;
EXPECT_EQ(serv.get_task_count(), 0);
EXPECT_EQ(serv.get_tasking_num(), 0);
EXPECT_EQ(serv.get_idle_task_worker_num(), serv.task_worker_num);
});

Expand All @@ -577,10 +578,13 @@ TEST(server, task_worker) {

int _dst_worker_id = 0;

ASSERT_GE(serv.gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
ASSERT_TRUE(serv.task(&buf, &_dst_worker_id));
ASSERT_EQ(serv.gs->task_count, 1);

t1.join();
serv.gs->task_workers.destroy();

ASSERT_EQ(serv.gs->task_count, 2);
}

// PHP_METHOD(swoole_server, task)
Expand All @@ -600,8 +604,7 @@ TEST(server, task_worker2) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand All @@ -623,7 +626,7 @@ TEST(server, task_worker2) {
memcpy(buf.data, packet, strlen(packet));
buf.info.reactor_id = worker->id;
buf.info.ext_flags |= (SW_TASK_NONBLOCK | SW_TASK_CALLBACK);
ASSERT_GE(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), 0);
ASSERT_EQ(serv->gs->task_workers.dispatch(&buf, &_dst_worker_id), SW_OK);
sleep(1);
kill(serv->gs->master_pid, SIGTERM);
}
Expand All @@ -649,8 +652,7 @@ TEST(server, task_worker3) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand Down Expand Up @@ -698,8 +700,7 @@ TEST(server, task_worker4) {

serv.onTask = [](Server *serv, swEventData *task) -> int {
EXPECT_EQ(string(task->data, task->info.len), string(packet));
int ret = serv->reply_task_result(task->data, task->info.len, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(task->data, task->info.len, 0, task));
return 0;
};

Expand All @@ -724,7 +725,7 @@ TEST(server, task_worker4) {
serv->gs->task_workers.dispatch(&buf, &_dst_worker_id);
sleep(1);

EventData *task_result = &(serv->task_result[swoole_get_process_id()]);
EventData *task_result = serv->get_task_result();
sw_memset_zero(task_result, sizeof(*task_result));
memset(&buf.info, 0, sizeof(buf.info));
buf.info.len = strlen(packet);
Expand Down Expand Up @@ -767,8 +768,7 @@ TEST(server, task_worker5) {
ifs.close();

EXPECT_EQ(string(resp), string(data));
int ret = serv->reply_task_result(resp, SW_IPC_MAX_SIZE * 2, 0, task);
EXPECT_GT(ret, 0);
EXPECT_TRUE(serv->finish(resp, SW_IPC_MAX_SIZE * 2, 0, task));
return 0;
};

Expand All @@ -779,7 +779,7 @@ TEST(server, task_worker5) {
if (worker->id == 1) {
int _dst_worker_id = 0;

EventData *task_result = &(serv->task_result[worker->id]);
EventData *task_result = &(serv->task_results[worker->id]);
sw_memset_zero(task_result, sizeof(*task_result));

File fp = make_tmpfile();
Expand Down
2 changes: 1 addition & 1 deletion ext-src/php_swoole_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ struct ServerObject {

struct TaskCo {
Coroutine *co;
int *list;
TaskId *list;
uint32_t count;
zval *result;
};
Expand Down
6 changes: 3 additions & 3 deletions ext-src/swoole_process_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,9 @@ static PHP_METHOD(swoole_process_pool, write) {
char *data;
size_t length;

if (zend_parse_parameters(ZEND_NUM_ARGS(), "s", &data, &length) == FAILURE) {
RETURN_FALSE;
}
ZEND_PARSE_PARAMETERS_START(1, 1)
Z_PARAM_STRING(data, length)
ZEND_PARSE_PARAMETERS_END_EX(RETURN_FALSE);

ProcessPool *pool = process_pool_get_and_check_pool(ZEND_THIS);
if (pool->ipc_mode != SW_IPC_SOCKET) {
Expand Down
Loading

0 comments on commit 7e30a33

Please sign in to comment.