Skip to content

Commit

Permalink
Add Drift serialization compatibility for session functions
Browse files Browse the repository at this point in the history
  • Loading branch information
tdcmeehan committed Apr 19, 2021
1 parent 684770c commit 3ab2e8e
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ public Map<String, String> getPreparedStatements()
return preparedStatements;
}

@ThriftField(23)
@JsonProperty
public Map<SqlFunctionId, SqlInvokedFunction> getSessionFunctions()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.PageIndexerFactory;
import com.facebook.presto.spi.PageSorter;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.spi.relation.DeterminismEvaluator;
import com.facebook.presto.spi.relation.DomainTranslator;
import com.facebook.presto.spi.relation.PredicateCompiler;
Expand Down Expand Up @@ -325,6 +326,11 @@ else if (serverConfig.isCoordinator()) {
jaxrsBinder(binder).bind(TaskExecutorResource.class);
newExporter(binder).export(TaskExecutorResource.class).withGeneratedName();
binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON);

install(new DefaultThriftCodecsModule());
thriftCodecBinder(binder).bindCustomThriftCodec(SqlInvokedFunctionCodec.class);
thriftCodecBinder(binder).bindCustomThriftCodec(SqlFunctionIdCodec.class);

binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON);
binder.bind(TaskManager.class).to(Key.get(SqlTaskManager.class));
binder.bind(SpoolingOutputBufferFactory.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -383,6 +389,7 @@ else if (serverConfig.isCoordinator()) {
jsonCodecBinder(binder).bindJsonCodec(OperatorStats.class);
jsonCodecBinder(binder).bindJsonCodec(ExecutionFailureInfo.class);
jsonCodecBinder(binder).bindJsonCodec(TableCommitContext.class);
jsonCodecBinder(binder).bindJsonCodec(SqlInvokedFunction.class);
smileCodecBinder(binder).bindSmileCodec(TaskStatus.class);
smileCodecBinder(binder).bindSmileCodec(TaskInfo.class);
thriftCodecBinder(binder).bindThriftCodec(TaskStatus.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.drift.codec.ThriftCodec;
import com.facebook.drift.codec.metadata.ThriftType;
import com.facebook.drift.protocol.TProtocolReader;
import com.facebook.drift.protocol.TProtocolWriter;
import com.facebook.presto.spi.function.SqlFunctionId;

// TODO: convert SqlFunctionId to be a native Thrift struct
public class SqlFunctionIdCodec
implements ThriftCodec<SqlFunctionId>
{
public ThriftType getType()
{
return new ThriftType(ThriftType.STRING, SqlFunctionId.class);
}

@Override
public SqlFunctionId read(TProtocolReader protocol)
throws Exception
{
return SqlFunctionId.parseSqlFunctionId(protocol.readString());
}

@Override
public void write(SqlFunctionId value, TProtocolWriter protocol)
throws Exception
{
protocol.writeString(value.toJsonString());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.drift.codec.ThriftCodec;
import com.facebook.drift.codec.metadata.ThriftType;
import com.facebook.drift.protocol.TProtocolReader;
import com.facebook.drift.protocol.TProtocolWriter;
import com.facebook.presto.spi.function.SqlInvokedFunction;

import javax.inject.Inject;

import static java.util.Objects.requireNonNull;

// TODO: convert SqlInvokedFunction to be a native Thrift struct
public class SqlInvokedFunctionCodec
implements ThriftCodec<SqlInvokedFunction>
{
private final JsonCodec<SqlInvokedFunction> jsonCodec;

@Inject
public SqlInvokedFunctionCodec(JsonCodec<SqlInvokedFunction> jsonCodec)
{
this.jsonCodec = requireNonNull(jsonCodec, "jsonCodec is null");
}

public ThriftType getType()
{
return new ThriftType(ThriftType.STRING, SqlInvokedFunction.class);
}

@Override
public SqlInvokedFunction read(TProtocolReader protocol)
throws Exception
{
return jsonCodec.fromJson(protocol.readString());
}

@Override
public void write(SqlInvokedFunction value, TProtocolWriter protocol)
throws Exception
{
protocol.writeString(jsonCodec.toJson(value));
}
}

0 comments on commit 3ab2e8e

Please sign in to comment.