Skip to content

Commit

Permalink
Refactor MemoryPoolListener
Browse files Browse the repository at this point in the history
Simplify to a functional interface and add queryId and memoryReservation
arg in preparation for per-query spill strategy.
  • Loading branch information
rschlussel committed Apr 23, 2021
1 parent 24243ad commit b6c170e
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.facebook.presto.operator.OperatorContext;
import com.facebook.presto.operator.PipelineContext;
import com.facebook.presto.operator.TaskContext;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.memory.MemoryPoolId;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig.TaskSpillingStrategy;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class MemoryRevokingScheduler
private final double memoryRevokingTarget;
private final TaskSpillingStrategy spillingStrategy;

private final MemoryPoolListener memoryPoolListener = MemoryPoolListener.onMemoryReserved(this::onMemoryReserved);
private final MemoryPoolListener memoryPoolListener = this::onMemoryReserved;

@Nullable
private ScheduledFuture<?> scheduledFuture;
Expand Down Expand Up @@ -152,7 +153,7 @@ void registerPoolListeners()
memoryPools.forEach(memoryPool -> memoryPool.addListener(memoryPoolListener));
}

private void onMemoryReserved(MemoryPool memoryPool)
private void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryMemoryReservation)
{
try {
if (!memoryRevokingNeeded(memoryPool)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,14 @@ public ListenableFuture<?> reserve(QueryId queryId, String allocationTag, long b
}
}

onMemoryReserved();
onMemoryReserved(queryId);
return result;
}

private void onMemoryReserved()
private void onMemoryReserved(QueryId queryId)
{
listeners.forEach(listener -> listener.onMemoryReserved(this));
long totalMemoryReservation = queryMemoryReservations.getOrDefault(queryId, 0L) + queryMemoryRevocableReservations.getOrDefault(queryId, 0L);
listeners.forEach(listener -> listener.onMemoryReserved(this, queryId, totalMemoryReservation));
}

public void onTaskMemoryReserved(TaskId taskId)
Expand Down Expand Up @@ -181,7 +182,7 @@ public ListenableFuture<?> reserveRevocable(QueryId queryId, long bytes)
}
}

onMemoryReserved();
onMemoryReserved(queryId);
return result;
}

Expand All @@ -202,7 +203,7 @@ public boolean tryReserve(QueryId queryId, String allocationTag, long bytes)
}
}

onMemoryReserved();
onMemoryReserved(queryId);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,17 @@
*/
package com.facebook.presto.memory;

import java.util.function.Consumer;
import com.facebook.presto.spi.QueryId;

@FunctionalInterface
public interface MemoryPoolListener
{
/**
* Invoked when memory reservation completes successfully.
*
* @param memoryPool the {@link MemoryPool} where the reservation took place
* @param memoryPool the {@link MemoryPool} where the reservation took place
* @param queryId the {@link QueryId} of the query that reserved the memory
* @param queryMemoryReservation the total amount of memory reserved by the query (revocable and regular)
*/
void onMemoryReserved(MemoryPool memoryPool);

/**
* Creates {@link MemoryPoolListener} implementing {@link #onMemoryReserved(MemoryPool)} only.
*/
static MemoryPoolListener onMemoryReserved(Consumer<? super MemoryPool> action)
{
return action::accept;
}
void onMemoryReserved(MemoryPool memoryPool, QueryId queryId, long queryMemoryReservation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ public void testNotifyListenerOnMemoryReserved()
setupConsumeRevocableMemory(ONE_BYTE, 10);
AtomicReference<MemoryPool> notifiedPool = new AtomicReference<>();
AtomicLong notifiedBytes = new AtomicLong();
userPool.addListener(MemoryPoolListener.onMemoryReserved(pool -> {
userPool.addListener((pool, queryId, memoryReservation) -> {
notifiedPool.set(pool);
notifiedBytes.set(pool.getReservedBytes());
}));
});

userPool.reserve(fakeQueryId, "test", 3);
assertEquals(notifiedPool.get(), userPool);
Expand Down

0 comments on commit b6c170e

Please sign in to comment.