Skip to content

Commit

Permalink
Refactor the object cache to better account for race conditions (#13204)
Browse files Browse the repository at this point in the history
Supersedes  #13075

Closes #13204

COPYBARA_INTEGRATE_REVIEW=#13204 from fowles:main 9b6aad6
PiperOrigin-RevId: 546579459
  • Loading branch information
fowles authored and copybara-github committed Jul 8, 2023
1 parent c2f07bb commit a01d047
Show file tree
Hide file tree
Showing 11 changed files with 244 additions and 160 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ src/js_embed

# vim generated
*.swp
*~

# Generated test scaffolding
src/no_warning_test.cc
Expand Down
4 changes: 1 addition & 3 deletions ruby/ext/google/protobuf_c/defs.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,7 @@ static VALUE DescriptorPool_alloc(VALUE klass) {

RB_OBJ_WRITE(ret, &self->def_to_descriptor, rb_hash_new());
self->symtab = upb_DefPool_New();
ObjectCache_Add(self->symtab, ret);

return ret;
return ObjectCache_TryAdd(self->symtab, ret);
}

/*
Expand Down
6 changes: 4 additions & 2 deletions ruby/ext/google/protobuf_c/map.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ VALUE Map_GetRubyWrapper(upb_Map* map, upb_CType key_type, TypeInfo value_type,
if (val == Qnil) {
val = Map_alloc(cMap);
Map* self;
ObjectCache_Add(map, val);
TypedData_Get_Struct(val, Map, &Map_type, self);
self->map = map;
self->arena = arena;
Expand All @@ -103,6 +102,7 @@ VALUE Map_GetRubyWrapper(upb_Map* map, upb_CType key_type, TypeInfo value_type,
const upb_MessageDef* val_m = self->value_type_info.def.msgdef;
self->value_type_class = Descriptor_DefToClass(val_m);
}
return ObjectCache_TryAdd(map, val);
}

return val;
Expand Down Expand Up @@ -319,7 +319,9 @@ static VALUE Map_init(int argc, VALUE* argv, VALUE _self) {

self->map = upb_Map_New(Arena_get(self->arena), self->key_type,
self->value_type_info.type);
ObjectCache_Add(self->map, _self);
VALUE stored = ObjectCache_TryAdd(self->map, _self);
(void)stored;
PBRUBY_ASSERT(stored == _self);

if (init_arg != Qnil) {
Map_merge_into_self(_self, init_arg);
Expand Down
4 changes: 3 additions & 1 deletion ruby/ext/google/protobuf_c/message.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ void Message_InitPtr(VALUE self_, upb_Message* msg, VALUE arena) {
Message* self = ruby_to_Message(self_);
self->msg = msg;
RB_OBJ_WRITE(self_, &self->arena, arena);
ObjectCache_Add(msg, self_);
VALUE stored = ObjectCache_TryAdd(msg, self_);
(void)stored;
PBRUBY_ASSERT(stored == self_);
}

VALUE Message_GetArena(VALUE msg_rb) {
Expand Down
172 changes: 24 additions & 148 deletions ruby/ext/google/protobuf_c/protobuf.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ const rb_data_type_t Arena_type = {
.flags = RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED,
};

static void* ruby_upb_allocfunc(upb_alloc* alloc, void* ptr, size_t oldsize, size_t size) {
static void *ruby_upb_allocfunc(upb_alloc *alloc, void *ptr, size_t oldsize,
size_t size) {
if (size == 0) {
xfree(ptr);
return NULL;
Expand Down Expand Up @@ -252,164 +253,40 @@ void Arena_register(VALUE module) {
// Object Cache
// -----------------------------------------------------------------------------

// A pointer -> Ruby Object cache that keeps references to Ruby wrapper
// objects. This allows us to look up any Ruby wrapper object by the address
// of the object it is wrapping. That way we can avoid ever creating two
// different wrapper objects for the same C object, which saves memory and
// preserves object identity.
//
// We use WeakMap for the cache. For Ruby <2.7 we also need a secondary Hash
// to store WeakMap keys because Ruby <2.7 WeakMap doesn't allow non-finalizable
// keys.
//
// We also need the secondary Hash if sizeof(long) < sizeof(VALUE), because this
// means it may not be possible to fit a pointer into a Fixnum. Keys are
// pointers, and if they fit into a Fixnum, Ruby doesn't collect them, but if
// they overflow and require allocating a Bignum, they could get collected
// prematurely, thus removing the cache entry. This happens on 64-bit Windows,
// on which pointers are 64 bits but longs are 32 bits. In this case, we enable
// the secondary Hash to hold the keys and prevent them from being collected.

#if RUBY_API_VERSION_CODE >= 20700 && SIZEOF_LONG >= SIZEOF_VALUE
#define USE_SECONDARY_MAP 0
#else
#define USE_SECONDARY_MAP 1
#endif

#if USE_SECONDARY_MAP

// Maps Numeric -> Object. The object is then used as a key into the WeakMap.
// This is needed for Ruby <2.7 where a number cannot be a key to WeakMap.
// The object is used only for its identity; it does not contain any data.
VALUE secondary_map = Qnil;

// Mutations to the map are under a mutex, because SeconaryMap_MaybeGC()
// iterates over the map which cannot happen in parallel with insertions, or
// Ruby will throw:
// can't add a new key into hash during iteration (RuntimeError)
VALUE secondary_map_mutex = Qnil;

// Lambda that will GC entries from the secondary map that are no longer present
// in the primary map.
VALUE gc_secondary_map_lambda = Qnil;
ID length;

extern VALUE weak_obj_cache;

static void SecondaryMap_Init() {
rb_gc_register_address(&secondary_map);
rb_gc_register_address(&gc_secondary_map_lambda);
rb_gc_register_address(&secondary_map_mutex);
secondary_map = rb_hash_new();
gc_secondary_map_lambda = rb_eval_string(
"->(secondary, weak) {\n"
" secondary.delete_if { |k, v| !weak.key?(v) }\n"
"}\n");
secondary_map_mutex = rb_mutex_new();
length = rb_intern("length");
}

// The secondary map is a regular Hash, and will never shrink on its own.
// The main object cache is a WeakMap that will automatically remove entries
// when the target object is no longer reachable, but unless we manually
// remove the corresponding entries from the secondary map, it will grow
// without bound.
//
// To avoid this unbounded growth we periodically remove entries from the
// secondary map that are no longer present in the WeakMap. The logic of
// how often to perform this GC is an artbirary tuning parameter that
// represents a straightforward CPU/memory tradeoff.
//
// Requires: secondary_map_mutex is held.
static void SecondaryMap_MaybeGC() {
PBRUBY_ASSERT(rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
size_t weak_len = NUM2ULL(rb_funcall(weak_obj_cache, length, 0));
size_t secondary_len = RHASH_SIZE(secondary_map);
if (secondary_len < weak_len) {
// Logically this case should not be possible: a valid entry cannot exist in
// the weak table unless there is a corresponding entry in the secondary
// table. It should *always* be the case that secondary_len >= weak_len.
//
// However ObjectSpace::WeakMap#length (and therefore weak_len) is
// unreliable: it overreports its true length by including non-live objects.
// However these non-live objects are not yielded in iteration, so we may
// have previously deleted them from the secondary map in a previous
// invocation of SecondaryMap_MaybeGC().
//
// In this case, we can't measure any waste, so we just return.
return;
}
size_t waste = secondary_len - weak_len;
// GC if we could remove at least 2000 entries or 20% of the table size
// (whichever is greater). Since the cost of the GC pass is O(N), we
// want to make sure that we condition this on overall table size, to
// avoid O(N^2) CPU costs.
size_t threshold = PBRUBY_MAX(secondary_len * 0.2, 2000);
if (waste > threshold) {
rb_funcall(gc_secondary_map_lambda, rb_intern("call"), 2, secondary_map,
weak_obj_cache);
}
}

// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE SecondaryMap_Get(VALUE key, bool create) {
PBRUBY_ASSERT(!create || rb_mutex_locked_p(secondary_map_mutex) == Qtrue);
VALUE ret = rb_hash_lookup(secondary_map, key);
if (ret == Qnil && create) {
SecondaryMap_MaybeGC();
ret = rb_class_new_instance(0, NULL, rb_cObject);
rb_hash_aset(secondary_map, key, ret);
}
return ret;
}

#endif

// Requires: secondary_map_mutex is held by this thread iff create == true.
static VALUE ObjectCache_GetKey(const void *key, bool create) {
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
VALUE ret = LL2NUM(key_val >> 2);
#if USE_SECONDARY_MAP
ret = SecondaryMap_Get(ret, create);
#endif
return ret;
}

// Public ObjectCache API.

VALUE weak_obj_cache = Qnil;
ID item_get;
ID item_set;
ID item_try_add;

static void ObjectCache_Init(VALUE protobuf) {
item_get = rb_intern("get");
item_try_add = rb_intern("try_add");

static void ObjectCache_Init() {
rb_gc_register_address(&weak_obj_cache);
VALUE klass = rb_eval_string("ObjectSpace::WeakMap");
weak_obj_cache = rb_class_new_instance(0, NULL, klass);
item_get = rb_intern("[]");
item_set = rb_intern("[]=");
#if USE_SECONDARY_MAP
SecondaryMap_Init();
#if RUBY_API_VERSION_CODE >= 20700 && SIZEOF_LONG >= SIZEOF_VALUE
VALUE cache_class = rb_const_get(protobuf, rb_intern("ObjectCache"));
#else
VALUE cache_class = rb_const_get(protobuf, rb_intern("LegacyObjectCache"));
#endif

weak_obj_cache = rb_class_new_instance(0, NULL, cache_class);
rb_const_set(protobuf, rb_intern("OBJECT_CACHE"), weak_obj_cache);
rb_const_set(protobuf, rb_intern("SIZEOF_LONG"), INT2NUM(SIZEOF_LONG));
rb_const_set(protobuf, rb_intern("SIZEOF_VALUE"), INT2NUM(SIZEOF_VALUE));
}

void ObjectCache_Add(const void *key, VALUE val) {
PBRUBY_ASSERT(ObjectCache_Get(key) == Qnil);
#if USE_SECONDARY_MAP
rb_mutex_lock(secondary_map_mutex);
#endif
VALUE key_rb = ObjectCache_GetKey(key, true);
rb_funcall(weak_obj_cache, item_set, 2, key_rb, val);
#if USE_SECONDARY_MAP
rb_mutex_unlock(secondary_map_mutex);
#endif
PBRUBY_ASSERT(ObjectCache_Get(key) == val);
VALUE ObjectCache_TryAdd(const void *key, VALUE val) {
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
return rb_funcall(weak_obj_cache, item_try_add, 2, LL2NUM(key_val), val);
}

// Returns the cached object for this key, if any. Otherwise returns Qnil.
VALUE ObjectCache_Get(const void *key) {
VALUE key_rb = ObjectCache_GetKey(key, false);
return rb_funcall(weak_obj_cache, item_get, 1, key_rb);
VALUE key_val = (VALUE)key;
PBRUBY_ASSERT((key_val & 3) == 0);
return rb_funcall(weak_obj_cache, item_get, 1, LL2NUM(key_val));
}

/*
Expand Down Expand Up @@ -459,11 +336,10 @@ VALUE Google_Protobuf_deep_copy(VALUE self, VALUE obj) {
// This must be named "Init_protobuf_c" because the Ruby module is named
// "protobuf_c" -- the VM looks for this symbol in our .so.
__attribute__((visibility("default"))) void Init_protobuf_c() {
ObjectCache_Init();

VALUE google = rb_define_module("Google");
VALUE protobuf = rb_define_module_under(google, "Protobuf");

ObjectCache_Init(protobuf);
Arena_register(protobuf);
Defs_register(protobuf);
RepeatedField_register(protobuf);
Expand Down
7 changes: 3 additions & 4 deletions ruby/ext/google/protobuf_c/protobuf.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,9 @@ void Arena_Pin(VALUE arena, VALUE obj);
// being collected (though in Ruby <2.7 is it effectively strong, due to
// implementation limitations).

// Adds an entry to the cache. The "arena" parameter must give the arena that
// "key" was allocated from. In Ruby <2.7.0, it will be used to remove the key
// from the cache when the arena is destroyed.
void ObjectCache_Add(const void* key, VALUE val);
// Tries to add a new entry to the cache, returning the newly installed value or
// the pre-existing entry.
VALUE ObjectCache_TryAdd(const void* key, VALUE val);

// Returns the cached object for this key, if any. Otherwise returns Qnil.
VALUE ObjectCache_Get(const void* key);
Expand Down
5 changes: 3 additions & 2 deletions ruby/ext/google/protobuf_c/repeated_field.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ VALUE RepeatedField_GetRubyWrapper(upb_Array* array, TypeInfo type_info,
if (val == Qnil) {
val = RepeatedField_alloc(cRepeatedField);
RepeatedField* self;
ObjectCache_Add(array, val);
TypedData_Get_Struct(val, RepeatedField, &RepeatedField_type, self);
self->array = array;
self->arena = arena;
self->type_info = type_info;
if (self->type_info.type == kUpb_CType_Message) {
self->type_class = Descriptor_DefToClass(type_info.def.msgdef);
}
val = ObjectCache_TryAdd(array, val);
}

PBRUBY_ASSERT(ruby_to_RepeatedField(val)->type_info.type == type_info.type);
Expand Down Expand Up @@ -615,7 +615,8 @@ VALUE RepeatedField_init(int argc, VALUE* argv, VALUE _self) {

self->type_info = TypeInfo_FromClass(argc, argv, 0, &self->type_class, &ary);
self->array = upb_Array_New(arena, self->type_info.type);
ObjectCache_Add(self->array, _self);
VALUE stored_val = ObjectCache_TryAdd(self->array, _self);
PBRUBY_ASSERT(stored_val == _self);

if (ary != Qnil) {
if (!RB_TYPE_P(ary, T_ARRAY)) {
Expand Down
1 change: 1 addition & 0 deletions ruby/lib/google/protobuf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

# require mixins before we hook them into the java & c code
require 'google/protobuf/message_exts'
require 'google/protobuf/object_cache'

# We define these before requiring the platform-specific modules.
# That way the module init can grab references to these.
Expand Down
Loading

0 comments on commit a01d047

Please sign in to comment.