Skip to content

Commit

Permalink
src: remove usage of AllocatedBuffer from stream_*
Browse files Browse the repository at this point in the history
Signed-off-by: Darshan Sen <[email protected]>

PR-URL: #40293
Reviewed-By: James M Snell <[email protected]>
Reviewed-By: Anna Henningsen <[email protected]>
  • Loading branch information
RaisinTen committed Oct 12, 2021
1 parent 15ce81a commit a784258
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 56 deletions.
7 changes: 3 additions & 4 deletions src/stream_base-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "allocated_buffer-inl.h"
#include "async_wrap-inl.h"
#include "base_object-inl.h"
#include "node.h"
Expand Down Expand Up @@ -270,9 +269,9 @@ ShutdownWrap* ShutdownWrap::FromObject(
return FromObject(base_obj->object());
}

void WriteWrap::SetAllocatedStorage(AllocatedBuffer&& storage) {
CHECK_NULL(storage_.data());
storage_ = std::move(storage);
void WriteWrap::SetBackingStore(std::unique_ptr<v8::BackingStore> bs) {
CHECK(!backing_store_);
backing_store_ = std::move(bs);
}

void StreamReq::Done(int status, const char* error_str) {
Expand Down
89 changes: 49 additions & 40 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace node {

using v8::Array;
using v8::ArrayBuffer;
using v8::BackingStore;
using v8::ConstructorBehavior;
using v8::Context;
using v8::DontDelete;
Expand All @@ -29,6 +30,7 @@ using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
using v8::Isolate;
using v8::Local;
using v8::MaybeLocal;
using v8::Object;
Expand Down Expand Up @@ -80,6 +82,8 @@ void StreamBase::SetWriteResult(const StreamWriteResult& res) {

int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();
Local<Context> context = env->context();

CHECK(args[0]->IsObject());
CHECK(args[1]->IsArray());
Expand All @@ -102,21 +106,21 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
if (!all_buffers) {
// Determine storage size first
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();

if (Buffer::HasInstance(chunk))
continue;
// Buffer chunk, no additional storage required

// String chunk
Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
size_t chunk_size;
if (encoding == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(env->isolate(), string, encoding).To(&chunk_size))
!StringBytes::Size(isolate, string, encoding).To(&chunk_size))
return 0;
else if (!StringBytes::StorageSize(env->isolate(), string, encoding)
else if (!StringBytes::StorageSize(isolate, string, encoding)
.To(&chunk_size))
return 0;
storage_size += chunk_size;
Expand All @@ -126,20 +130,22 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
return UV_ENOBUFS;
} else {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i).ToLocalChecked();
bufs[i].base = Buffer::Data(chunk);
bufs[i].len = Buffer::Length(chunk);
}
}

AllocatedBuffer storage;
if (storage_size > 0)
storage = AllocatedBuffer::AllocateManaged(env, storage_size);
std::unique_ptr<BackingStore> bs;
if (storage_size > 0) {
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
}

offset = 0;
if (!all_buffers) {
for (size_t i = 0; i < count; i++) {
Local<Value> chunk = chunks->Get(env->context(), i * 2).ToLocalChecked();
Local<Value> chunk = chunks->Get(context, i * 2).ToLocalChecked();

// Write buffer
if (Buffer::HasInstance(chunk)) {
Expand All @@ -150,13 +156,14 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

// Write string
CHECK_LE(offset, storage_size);
char* str_storage = storage.data() + offset;
size_t str_size = storage.size() - offset;

Local<String> string = chunk->ToString(env->context()).ToLocalChecked();
enum encoding encoding = ParseEncoding(env->isolate(),
chunks->Get(env->context(), i * 2 + 1).ToLocalChecked());
str_size = StringBytes::Write(env->isolate(),
char* str_storage =
static_cast<char*>(bs ? bs->Data() : nullptr) + offset;
size_t str_size = (bs ? bs->ByteLength() : 0) - offset;

Local<String> string = chunk->ToString(context).ToLocalChecked();
enum encoding encoding = ParseEncoding(isolate,
chunks->Get(context, i * 2 + 1).ToLocalChecked());
str_size = StringBytes::Write(isolate,
str_storage,
str_size,
string,
Expand All @@ -169,9 +176,8 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(std::move(storage));
}
if (res.wrap != nullptr && storage_size > 0)
res.wrap->SetBackingStore(std::move(bs));
return res.err;
}

Expand Down Expand Up @@ -216,6 +222,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
template <enum encoding enc>
int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
Isolate* isolate = env->isolate();
CHECK(args[0]->IsObject());
CHECK(args[1]->IsString());

Expand All @@ -230,9 +237,9 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// computing their actual size, rather than tripling the storage.
size_t storage_size;
if (enc == UTF8 && string->Length() > 65535 &&
!StringBytes::Size(env->isolate(), string, enc).To(&storage_size))
!StringBytes::Size(isolate, string, enc).To(&storage_size))
return 0;
else if (!StringBytes::StorageSize(env->isolate(), string, enc)
else if (!StringBytes::StorageSize(isolate, string, enc)
.To(&storage_size))
return 0;

Expand All @@ -248,7 +255,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
bool try_write = storage_size <= sizeof(stack_storage) &&
(!IsIPCPipe() || send_handle_obj.IsEmpty());
if (try_write) {
data_size = StringBytes::Write(env->isolate(),
data_size = StringBytes::Write(isolate,
stack_storage,
storage_size,
string,
Expand All @@ -274,26 +281,28 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
CHECK_EQ(count, 1);
}

AllocatedBuffer data;
std::unique_ptr<BackingStore> bs;

if (try_write) {
// Copy partial data
data = AllocatedBuffer::AllocateManaged(env, buf.len);
memcpy(data.data(), buf.base, buf.len);
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, buf.len);
memcpy(static_cast<char*>(bs->Data()), buf.base, buf.len);
data_size = buf.len;
} else {
// Write it
data = AllocatedBuffer::AllocateManaged(env, storage_size);
data_size = StringBytes::Write(env->isolate(),
data.data(),
NoArrayBufferZeroFillScope no_zero_fill_scope(env->isolate_data());
bs = ArrayBuffer::NewBackingStore(isolate, storage_size);
data_size = StringBytes::Write(isolate,
static_cast<char*>(bs->Data()),
storage_size,
string,
enc);
}

CHECK_LE(data_size, storage_size);

buf = uv_buf_init(data.data(), data_size);
buf = uv_buf_init(static_cast<char*>(bs->Data()), data_size);

uv_stream_t* send_handle = nullptr;

Expand All @@ -312,9 +321,8 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
res.bytes += synchronously_written;

SetWriteResult(res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(std::move(data));
}
if (res.wrap != nullptr)
res.wrap->SetBackingStore(std::move(bs));

return res.err;
}
Expand Down Expand Up @@ -511,27 +519,28 @@ void StreamResource::ClearError() {
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
CHECK_NOT_NULL(stream_);
Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
return AllocatedBuffer::AllocateManaged(env, suggested_size).release();
return env->allocate_managed_buffer(suggested_size);
}

void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
CHECK_NOT_NULL(stream_);
StreamBase* stream = static_cast<StreamBase*>(stream_);
Environment* env = stream->stream_env();
HandleScope handle_scope(env->isolate());
Isolate* isolate = env->isolate();
HandleScope handle_scope(isolate);
Context::Scope context_scope(env->context());
AllocatedBuffer buf(env, buf_);
std::unique_ptr<BackingStore> bs = env->release_managed_buffer(buf_);

if (nread <= 0) {
if (nread < 0)
stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
return;
}

CHECK_LE(static_cast<size_t>(nread), buf.size());
buf.Resize(nread);
CHECK_LE(static_cast<size_t>(nread), bs->ByteLength());
bs = BackingStore::Reallocate(isolate, std::move(bs), nread);

stream->CallJSOnreadMethod(nread, buf.ToArrayBuffer());
stream->CallJSOnreadMethod(nread, ArrayBuffer::New(isolate, std::move(bs)));
}


Expand Down
5 changes: 2 additions & 3 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "env.h"
#include "allocated_buffer.h"
#include "async_wrap.h"
#include "node.h"
#include "util.h"
Expand Down Expand Up @@ -90,7 +89,7 @@ class ShutdownWrap : public StreamReq {

class WriteWrap : public StreamReq {
public:
inline void SetAllocatedStorage(AllocatedBuffer&& storage);
inline void SetBackingStore(std::unique_ptr<v8::BackingStore> bs);

inline WriteWrap(
StreamBase* stream,
Expand All @@ -105,7 +104,7 @@ class WriteWrap : public StreamReq {
void OnDone(int status) override;

private:
AllocatedBuffer storage_;
std::unique_ptr<v8::BackingStore> backing_store_;
};


Expand Down
15 changes: 8 additions & 7 deletions src/stream_pipe.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include "stream_pipe.h"
#include "allocated_buffer-inl.h"
#include "stream_base-inl.h"
#include "node_buffer.h"
#include "util-inl.h"

namespace node {

using v8::BackingStore;
using v8::Context;
using v8::Function;
using v8::FunctionCallbackInfo;
Expand Down Expand Up @@ -118,13 +118,13 @@ uv_buf_t StreamPipe::ReadableListener::OnStreamAlloc(size_t suggested_size) {
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
size_t size = std::min(suggested_size, pipe->wanted_data_);
CHECK_GT(size, 0);
return AllocatedBuffer::AllocateManaged(pipe->env(), size).release();
return pipe->env()->allocate_managed_buffer(size);
}

void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
const uv_buf_t& buf_) {
StreamPipe* pipe = ContainerOf(&StreamPipe::readable_listener_, this);
AllocatedBuffer buf(pipe->env(), buf_);
std::unique_ptr<BackingStore> bs = pipe->env()->release_managed_buffer(buf_);
if (nread < 0) {
// EOF or error; stop reading and pass the error to the previous listener
// (which might end up in JS).
Expand All @@ -144,19 +144,20 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
return;
}

pipe->ProcessData(nread, std::move(buf));
pipe->ProcessData(nread, std::move(bs));
}

void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) {
void StreamPipe::ProcessData(size_t nread,
std::unique_ptr<BackingStore> bs) {
CHECK(uses_wants_write_ || pending_writes_ == 0);
uv_buf_t buffer = uv_buf_init(buf.data(), nread);
uv_buf_t buffer = uv_buf_init(static_cast<char*>(bs->Data()), nread);
StreamWriteResult res = sink()->Write(&buffer, 1);
pending_writes_++;
if (!res.async) {
writable_listener_.OnStreamAfterWrite(nullptr, res.err);
} else {
is_reading_ = false;
res.wrap->SetAllocatedStorage(std::move(buf));
res.wrap->SetBackingStore(std::move(bs));
if (source() != nullptr)
source()->ReadStop();
}
Expand Down
3 changes: 1 addition & 2 deletions src/stream_pipe.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "stream_base.h"
#include "allocated_buffer.h"

namespace node {

Expand Down Expand Up @@ -43,7 +42,7 @@ class StreamPipe : public AsyncWrap {
// `OnStreamWantsWrite()` support.
size_t wanted_data_ = 0;

void ProcessData(size_t nread, AllocatedBuffer&& buf);
void ProcessData(size_t nread, std::unique_ptr<v8::BackingStore> bs);

class ReadableListener : public StreamListener {
public:
Expand Down

0 comments on commit a784258

Please sign in to comment.