Skip to content

Commit

Permalink
Add container engine fix back (falcosecurity#128)
Browse files Browse the repository at this point in the history
* Revert "Revert "Merge upstream pr 688 (falcosecurity#121)" (falcosecurity#122)"

This reverts commit c8dbbf3.

This adds the fix back. I'll test with an agent PR that
updates/removes the tests.

* Add the ability to "defer" an async lookup

In some cases, the "server" code running run_impl might want to retry
its work until later. The current version can't do that--once a key is
dequeued using deque_next_key, it has to call store_value or lose the
request.

To make retries easier, add a method defer_lookup that pushes the
key (and optional value) back onto the request queue with a
configurable delay. After delay, the key will be pulled again with a
call to dequeue_next_key().

Signed-off-by: Mark Stemm <[email protected]>

* Use defer_lookup for container info retry instead of lookup_delayed

When the container async lookup class wants to retry a lookup, the
current version tries to use lookup_delayed to initiate a new request.

It turns out that that doesn't work--if there's already an existing
request in m_value_map, it assumes that the "server" doing run_impl
will eventually return an answer, and doesn't add a request to the
queue.

The solution is to use the newly added lookup_delayed instead, which
pushes the request back onto the queue with a short delay.

Signed-off-by: Mark Stemm <[email protected]>

* Use a separate max_wait_ms instead of re-using s_cri_timeout

Now that timeouts are working, it may take several seconds for
subsequent retries to complete. However, s_cri_timeout (typically 1
second) was being used for the max_wait_ms in cri_async_source. That
would mean that a lookup would expire before the server side had
retried the lookup.

The solution is to use a separate 10 second max_wait_ms, which matches
docker.

Signed-off-by: Mark Stemm <[email protected]>

Signed-off-by: Mark Stemm <[email protected]>
  • Loading branch information
mstemm authored Nov 17, 2022
1 parent f20935d commit e737c85
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
flavor: [ regular, bundled-deps, with-chisels, minimal ]
include:
- flavor: regular
build-args: '-DBUILD_BPF=On -DUSE_BUNDLED_DEPS=False'
build-args: '-DBUILD_BPF=On -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=True'
- flavor: bundled-deps
build-args: '-DBUILD_BPF=On -DUSE_BUNDLED_DEPS=True'
- flavor: with-chisels
Expand Down Expand Up @@ -50,6 +50,7 @@ jobs:
protobuf-compiler-grpc \
libgtest-dev \
libprotobuf-dev \
libre2-dev \
linux-headers-$(uname -r) \
&& apt-get clean
env:
Expand Down
23 changes: 19 additions & 4 deletions userspace/libsinsp/async/async_key_value_source.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ namespace libsinsp
* value source. Subclasses will override the the run_impl() method and
* implement the concrete value lookup behavior. In that method, subclasses
* will use use dequeue_next_key() method to get the key that it will use to
* collect the value(s), collect the appropriate value(s), and call the
* store_value() method to save the value. The run_impl() method should
* continue to dequeue and process values while the dequeue_next_key() method
* returns true.
* collect the value(s), collect the appropriate value(s), and either:
* - call the store_value() method to save the value.
* - call the defer_lookup() method to retry the lookup after a delay.
*
* The run_impl() method should continue to dequeue and process values
* while the dequeue_next_key() method returns true.
*
* The constructor for this class accepts a maximum wait time; this specifies
* how long client code is willing to wait for a synchronous response (i.e.,
Expand Down Expand Up @@ -242,6 +244,19 @@ class async_key_value_source
*/
void store_value(const key_type& key, const value_type& value);

/**
* Defer the lookup for the given key for delay ms. This puts
* the key back on the request queue with a deadline of now +
* delay ms to allow the run_impl thread to retry the lookup
* later.
*
* If value_ptr is non-NULL, the contents will be saved and provided
* to the next call of dequeue_next_key().
*/
void defer_lookup(const key_type& key,
value_type* value_ptr,
std::chrono::milliseconds delay);

/**
* Concrete subclasses must override this method to perform the
* asynchronous value lookup. The implementation should:
Expand Down
34 changes: 33 additions & 1 deletion userspace/libsinsp/async/async_key_value_source.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,8 @@ bool async_key_value_source<key_type, value_type>::dequeue_next_key(key_type& ke
if(!m_request_queue.empty())
{
auto top_element = m_request_queue.top();
if(top_element.first < std::chrono::steady_clock::now())
auto now = std::chrono::steady_clock::now();
if(top_element.first < now)
{
key_found = true;
key = std::move(top_element.second);
Expand All @@ -268,6 +269,14 @@ bool async_key_value_source<key_type, value_type>::dequeue_next_key(key_type& ke
*value_ptr = m_value_map[key].m_value;
}
}
else
{
std::chrono::duration<double> dur = top_element.first - now;
g_logger.log("async_key_value_source: Waiting " +
std::to_string(dur.count()) +
" before dequeuing top job",
sinsp_logger::SEV_DEBUG);
}
}

return key_found;
Expand Down Expand Up @@ -310,6 +319,29 @@ void async_key_value_source<key_type, value_type>::store_value(
}
}

template<typename key_type, typename value_type>
void async_key_value_source<key_type, value_type>::defer_lookup(
const key_type& key,
value_type* value_ptr,
std::chrono::milliseconds delay)
{
std::lock_guard<std::mutex> guard(m_mutex);

auto start_time = std::chrono::steady_clock::now() + delay;

g_logger.log("async_key_value_source: defer_lookup re-adding to request queue delay=" +
std::to_string(delay.count()),
sinsp_logger::SEV_DEBUG);

m_request_queue.push(std::make_pair(start_time, key));
m_request_set.insert(key);
if(value_ptr)
{
m_value_map[key].m_value = *value_ptr;
}
m_queue_not_empty_condition.notify_one();
}

/**
* Prune any "old" outstanding requests. This method expects that the caller
* is holding m_mutex.
Expand Down
14 changes: 14 additions & 0 deletions userspace/libsinsp/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,20 @@ void sinsp_container_manager::notify_new_container(const sinsp_container_info& c
}
else
{
// We don't log any warning when the inspector
// is doing its initial scan from /proc + any
// container lookups. Those don't have
// retries.
if(!container_info.is_successful() && m_inspector->m_inited)
{
// This means that the container
// engine made multiple attempts to
// look up the info and all attempts
// failed. Log that as a warning.
g_logger.format(sinsp_logger::SEV_WARNING,
"notify_new_container (%s): Saving empty container info after repeated failed lookups",
container_info.m_id.c_str());
}
add_container(std::make_shared<sinsp_container_info>(container_info), tinfo);
}
return;
Expand Down
38 changes: 20 additions & 18 deletions userspace/libsinsp/container_engine/container_async_source.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,21 +95,29 @@ void container_async_source<key_type>::run_impl()
while(this->dequeue_next_key(key, &res))
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"%s_async (%s): Source dequeued key",
"%s_async (%s): Source dequeued key attempt=%u",
name(),
container_id(key).c_str());
container_id(key).c_str(),
res.m_lookup.retry_no());

lookup_sync(key, res);

// For security reasons we store the value regardless of the lookup status on the
// first attempt, so we can track the container activity even without its metadata.
// For subsequent attempts we store it only if successful.
if(res.m_lookup.first_attempt() || res.m_lookup.is_successful())
if(!res.m_lookup.should_retry())
{
// Either the fetch was successful or the
// maximum number of retries have occurred.
if(!res.m_lookup.is_successful())
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"%s_async (%s): Could not look up container info after %u retries",
name(),
container_id(key).c_str(),
res.m_lookup.retry_no());
}

this->store_value(key, res);
}

if(res.m_lookup.should_retry())
else
{
// Make a new attempt
res.m_lookup.attempt_increment();
Expand All @@ -120,15 +128,9 @@ void container_async_source<key_type>::run_impl()
container_id(key).c_str(),
res.m_lookup.retry_no());

this->lookup_delayed(
key,
res,
std::chrono::milliseconds(res.m_lookup.delay()),
std::bind(
&container_async_source::source_callback,
this,
std::placeholders::_1,
std::placeholders::_2));
this->defer_lookup(key,
&res,
std::chrono::milliseconds(res.m_lookup.delay()));
}

// Reset res
Expand All @@ -137,4 +139,4 @@ void container_async_source<key_type>::run_impl()
}

} // namespace container_engine
} // namespace libsinsp
} // namespace libsinsp
9 changes: 8 additions & 1 deletion userspace/libsinsp/container_engine/cri.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ bool cri::resolve(sinsp_threadinfo *tinfo, bool query_os_for_missing_info)

if(!m_async_source)
{
auto async_source = new cri_async_source(cache, m_cri.get(), s_cri_timeout);
uint64_t max_wait_ms = 10000;
auto async_source = new cri_async_source(cache, m_cri.get(), max_wait_ms);
m_async_source = std::unique_ptr<cri_async_source>(async_source);
}

Expand All @@ -335,10 +336,16 @@ bool cri::resolve(sinsp_threadinfo *tinfo, bool query_os_for_missing_info)
const bool async = s_async && cache->async_allowed();
if(async)
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"cri_async (%s): Starting asynchronous lookup",
container_id.c_str());
done = m_async_source->lookup(key, result);
}
else
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"cri_async (%s): Starting synchronous lookup",
container_id.c_str());
done = m_async_source->lookup_sync(key, result);
}

Expand Down
6 changes: 6 additions & 0 deletions userspace/libsinsp/container_engine/docker/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ void docker_base::parse_docker(const docker_lookup_request& request, container_c
bool done;
if (cache->async_allowed())
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"docker_async (%s): Starting asynchronous lookup",
request.container_id.c_str());
done = m_docker_info_source->lookup(request, result);
}
else
{
g_logger.format(sinsp_logger::SEV_DEBUG,
"docker_async (%s): Starting synchronous lookup",
request.container_id.c_str());
done = m_docker_info_source->lookup_sync(request, result);
}
if (done)
Expand Down

0 comments on commit e737c85

Please sign in to comment.