-
Notifications
You must be signed in to change notification settings - Fork 379
/
ProtoEntitySerDe.java
148 lines (132 loc) · 5.9 KB
/
ProtoEntitySerDe.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/*
* Copyright 2023 Datastrato.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.graviton.proto;
import com.datastrato.graviton.Entity;
import com.datastrato.graviton.EntitySerDe;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.protobuf.Any;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.Map;
public class ProtoEntitySerDe implements EntitySerDe {
// The implementation of different entities should also register its class to this map,
// otherwise ProtoEntitySerDe will not be able to deserialize the entity.
private static final Map<String, String> ENTITY_TO_SERDE =
ImmutableMap.<String, String>builder()
.put(
"com.datastrato.graviton.meta.AuditInfo",
"com.datastrato.graviton.proto.AuditInfoSerDe")
.put(
"com.datastrato.graviton.meta.BaseMetalake",
"com.datastrato.graviton.proto.BaseMetalakeSerDe")
.put(
"com.datastrato.graviton.meta.CatalogEntity",
"com.datastrato.graviton.proto.CatalogEntitySerDe")
.put(
"com.datastrato.graviton.meta.rel.BaseSchema",
"com.datastrato.graviton.proto.SchemaEntitySerDe")
.put(
"com.datastrato.graviton.catalog.hive.HiveSchema",
"com.datastrato.graviton.proto.SchemaEntitySerDe")
.put(
"com.datastrato.graviton.meta.rel.BaseTable",
"com.datastrato.graviton.proto.TableEntitySerde")
.put(
"com.datastrato.graviton.catalog.hive.HiveTable",
"com.datastrato.graviton.proto.TableEntitySerde")
.build();
private static final Map<String, String> ENTITY_TO_PROTO =
ImmutableMap.of(
"com.datastrato.graviton.meta.AuditInfo",
"com.datastrato.graviton.proto.AuditInfo",
"com.datastrato.graviton.meta.BaseMetalake",
"com.datastrato.graviton.proto.Metalake",
"com.datastrato.graviton.meta.CatalogEntity",
"com.datastrato.graviton.proto.Catalog",
"com.datastrato.graviton.meta.rel.BaseSchema",
"com.datastrato.graviton.proto.Schema",
"com.datastrato.graviton.meta.rel.BaseTable",
"com.datastrato.graviton.proto.Table");
private final Map<Class<? extends Entity>, ProtoSerDe<? extends Entity, ? extends Message>>
entityToSerDe;
private final Map<Class<? extends Entity>, Class<? extends Message>> entityToProto;
public ProtoEntitySerDe() {
this.entityToSerDe = Maps.newHashMap();
this.entityToProto = Maps.newHashMap();
}
@Override
public <T extends Entity> byte[] serialize(T t) throws IOException {
Any any = Any.pack(toProto(t, Thread.currentThread().getContextClassLoader()));
return any.toByteArray();
}
@Override
public <T extends Entity> T deserialize(byte[] bytes, Class<T> clazz, ClassLoader classLoader)
throws IOException {
Any any = Any.parseFrom(bytes);
Class<? extends Message> protoClass = getProtoClass(clazz, classLoader);
if (!any.is(protoClass)) {
throw new IOException("Invalid proto for entity " + clazz.getName());
}
Message anyMessage = any.unpack(protoClass);
return fromProto(anyMessage, clazz, classLoader);
}
private <T extends Entity, M extends Message> ProtoSerDe<T, M> getProtoSerde(
Class<T> entityClass, ClassLoader classLoader) throws IOException {
if (!ENTITY_TO_SERDE.containsKey(entityClass.getCanonicalName())
|| ENTITY_TO_SERDE.get(entityClass.getCanonicalName()) == null) {
throw new IOException("No serde found for entity " + entityClass.getCanonicalName());
}
return (ProtoSerDe<T, M>)
entityToSerDe.computeIfAbsent(
entityClass,
k -> {
try {
Class<? extends ProtoSerDe<? extends Entity, ? extends Message>> serdeClazz =
(Class<? extends ProtoSerDe<? extends Entity, ? extends Message>>)
loadClass(ENTITY_TO_SERDE.get(k.getCanonicalName()), classLoader);
return serdeClazz.newInstance();
} catch (Exception e) {
throw new RuntimeException(
"Failed to instantiate serde class " + k.getCanonicalName(), e);
}
});
}
private Class<? extends Message> getProtoClass(
Class<? extends Entity> entityClass, ClassLoader classLoader) throws IOException {
if (!ENTITY_TO_PROTO.containsKey(entityClass.getCanonicalName())
|| ENTITY_TO_PROTO.get(entityClass.getCanonicalName()) == null) {
throw new IOException("No proto class found for entity " + entityClass.getCanonicalName());
}
return entityToProto.computeIfAbsent(
entityClass,
k -> {
try {
return (Class<? extends Message>)
loadClass(ENTITY_TO_PROTO.get(k.getCanonicalName()), classLoader);
} catch (Exception e) {
throw new RuntimeException("Failed to create proto class " + k.getCanonicalName(), e);
}
});
}
private <T extends Entity, M extends Message> M toProto(T t, ClassLoader classLoader)
throws IOException {
ProtoSerDe<T, M> protoSerDe = (ProtoSerDe<T, M>) getProtoSerde(t.getClass(), classLoader);
return protoSerDe.serialize(t);
}
private <T extends Entity, M extends Message> T fromProto(
M m, Class<T> entityClass, ClassLoader classLoader) throws IOException {
ProtoSerDe<T, Message> protoSerDe = getProtoSerde(entityClass, classLoader);
return protoSerDe.deserialize(m);
}
private Class<?> loadClass(String className, ClassLoader classLoader) throws IOException {
try {
return Class.forName(className, true, classLoader);
} catch (Exception e) {
throw new IOException(
"Failed to load class " + className + " with classLoader " + classLoader, e);
}
}
}