From 5fd93a7765d4923cffb426b6d159e44ae88799ef Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 25 Dec 2019 15:44:07 +0800 Subject: [PATCH 01/15] backup --- idl/base.thrift | 7 + .../infra/pegasus/base/request_meta.java | 677 ++++++++++++++++++ .../pegasus/operator/client_operator.java | 25 +- .../infra/pegasus/rpc/ThriftHeader.java | 22 +- .../pegasus/rpc/async/ThriftFrameEncoder.java | 10 +- 5 files changed, 712 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java diff --git a/idl/base.thrift b/idl/base.thrift index 7728e6c6..649db19c 100644 --- a/idl/base.thrift +++ b/idl/base.thrift @@ -22,3 +22,10 @@ struct rpc_address struct gpid { } + +struct request_meta { + 1:i32 app_id; + 2:i32 partition_index; + 3:i32 client_timeout; + 4:i64 partition_hash; +} \ No newline at end of file diff --git a/src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java b/src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java new file mode 100644 index 00000000..a7228d82 --- /dev/null +++ b/src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java @@ -0,0 +1,677 @@ +/** + * Autogenerated by Thrift + * + *

DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * + * @generated + */ +package com.xiaomi.infra.pegasus.base; + +import com.xiaomi.infra.pegasus.thrift.*; +import com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData; +import com.xiaomi.infra.pegasus.thrift.meta_data.FieldValueMetaData; +import com.xiaomi.infra.pegasus.thrift.protocol.*; +import com.xiaomi.infra.pegasus.thrift.scheme.IScheme; +import com.xiaomi.infra.pegasus.thrift.scheme.SchemeFactory; +import com.xiaomi.infra.pegasus.thrift.scheme.StandardScheme; +import com.xiaomi.infra.pegasus.thrift.scheme.TupleScheme; +import com.xiaomi.infra.pegasus.thrift.transport.TIOStreamTransport; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class request_meta + implements TBase, + java.io.Serializable, + Cloneable, + Comparable { + private static final TStruct STRUCT_DESC = new TStruct("meta"); + + private static final TField APP_ID_FIELD_DESC = new TField("app_id", TType.I32, (short) 1); + private static final TField PARTITION_INDEX_FIELD_DESC = + new TField("partition_index", TType.I32, (short) 2); + private static final TField CLIENT_TIMEOUT_FIELD_DESC = + new TField("client_timeout", TType.I32, (short) 3); + private static final TField PARTITION_HASH_FIELD_DESC = + new TField("partition_hash", TType.I64, (short) 4); + + private static final Map, SchemeFactory> schemes = + new HashMap, SchemeFactory>(); + + static { + schemes.put(StandardScheme.class, new metaStandardSchemeFactory()); + schemes.put(TupleScheme.class, new metaTupleSchemeFactory()); + } + + public int app_id; // required + public int partition_index; // required + public int client_timeout; // required + public long partition_hash; // required + + /** + * The set of fields this struct contains, along with convenience methods for finding and + * manipulating them. + */ + public enum _Fields implements TFieldIdEnum { + APP_ID((short) 1, "app_id"), + PARTITION_INDEX((short) 2, "partition_index"), + CLIENT_TIMEOUT((short) 3, "client_timeout"), + PARTITION_HASH((short) 4, "partition_hash"); + + private static final Map byName = new HashMap(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** Find the _Fields constant that matches fieldId, or null if its not found. */ + public static _Fields findByThriftId(int fieldId) { + switch (fieldId) { + case 1: // APP_ID + return APP_ID; + case 2: // PARTITION_INDEX + return PARTITION_INDEX; + case 3: // CLIENT_TIMEOUT + return CLIENT_TIMEOUT; + case 4: // PARTITION_HASH + return PARTITION_HASH; + default: + return null; + } + } + + /** Find the _Fields constant that matches fieldId, throwing an exception if it is not found. */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) + throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** Find the _Fields constant that matches name, or null if its not found. */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __APP_ID_ISSET_ID = 0; + private static final int __PARTITION_INDEX_ISSET_ID = 1; + private static final int __CLIENT_TIMEOUT_ISSET_ID = 2; + private static final int __PARTITION_HASH_ISSET_ID = 3; + private byte __isset_bitfield = 0; + public static final Map<_Fields, FieldMetaData> metaDataMap; + + static { + Map<_Fields, FieldMetaData> tmpMap = new EnumMap<_Fields, FieldMetaData>(_Fields.class); + tmpMap.put( + _Fields.APP_ID, + new FieldMetaData( + "app_id", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + tmpMap.put( + _Fields.PARTITION_INDEX, + new FieldMetaData( + "partition_index", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + tmpMap.put( + _Fields.CLIENT_TIMEOUT, + new FieldMetaData( + "client_timeout", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I32))); + tmpMap.put( + _Fields.PARTITION_HASH, + new FieldMetaData( + "partition_hash", TFieldRequirementType.DEFAULT, new FieldValueMetaData(TType.I64))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + FieldMetaData.addStructMetaDataMap(request_meta.class, metaDataMap); + } + + public request_meta() {} + + public request_meta(int app_id, int partition_index, int client_timeout, long partition_hash) { + this(); + this.app_id = app_id; + setApp_idIsSet(true); + this.partition_index = partition_index; + setPartition_indexIsSet(true); + this.client_timeout = client_timeout; + setClient_timeoutIsSet(true); + this.partition_hash = partition_hash; + setPartition_hashIsSet(true); + } + + /** Performs a deep copy on other. */ + public request_meta(request_meta other) { + __isset_bitfield = other.__isset_bitfield; + this.app_id = other.app_id; + this.partition_index = other.partition_index; + this.client_timeout = other.client_timeout; + this.partition_hash = other.partition_hash; + } + + public request_meta deepCopy() { + return new request_meta(this); + } + + @Override + public void clear() { + setApp_idIsSet(false); + this.app_id = 0; + setPartition_indexIsSet(false); + this.partition_index = 0; + setClient_timeoutIsSet(false); + this.client_timeout = 0; + setPartition_hashIsSet(false); + this.partition_hash = 0; + } + + public int getApp_id() { + return this.app_id; + } + + public request_meta setApp_id(int app_id) { + this.app_id = app_id; + setApp_idIsSet(true); + return this; + } + + public void unsetApp_id() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __APP_ID_ISSET_ID); + } + + /** Returns true if field app_id is set (has been assigned a value) and false otherwise */ + public boolean isSetApp_id() { + return EncodingUtils.testBit(__isset_bitfield, __APP_ID_ISSET_ID); + } + + public void setApp_idIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __APP_ID_ISSET_ID, value); + } + + public int getPartition_index() { + return this.partition_index; + } + + public request_meta setPartition_index(int partition_index) { + this.partition_index = partition_index; + setPartition_indexIsSet(true); + return this; + } + + public void unsetPartition_index() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID); + } + + /** + * Returns true if field partition_index is set (has been assigned a value) and false otherwise + */ + public boolean isSetPartition_index() { + return EncodingUtils.testBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID); + } + + public void setPartition_indexIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PARTITION_INDEX_ISSET_ID, value); + } + + public int getClient_timeout() { + return this.client_timeout; + } + + public request_meta setClient_timeout(int client_timeout) { + this.client_timeout = client_timeout; + setClient_timeoutIsSet(true); + return this; + } + + public void unsetClient_timeout() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); + } + + /** Returns true if field client_timeout is set (has been assigned a value) and false otherwise */ + public boolean isSetClient_timeout() { + return EncodingUtils.testBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID); + } + + public void setClient_timeoutIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __CLIENT_TIMEOUT_ISSET_ID, value); + } + + public long getPartition_hash() { + return this.partition_hash; + } + + public request_meta setPartition_hash(long partition_hash) { + this.partition_hash = partition_hash; + setPartition_hashIsSet(true); + return this; + } + + public void unsetPartition_hash() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID); + } + + /** Returns true if field partition_hash is set (has been assigned a value) and false otherwise */ + public boolean isSetPartition_hash() { + return EncodingUtils.testBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID); + } + + public void setPartition_hashIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PARTITION_HASH_ISSET_ID, value); + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case APP_ID: + if (value == null) { + unsetApp_id(); + } else { + setApp_id((Integer) value); + } + break; + + case PARTITION_INDEX: + if (value == null) { + unsetPartition_index(); + } else { + setPartition_index((Integer) value); + } + break; + + case CLIENT_TIMEOUT: + if (value == null) { + unsetClient_timeout(); + } else { + setClient_timeout((Integer) value); + } + break; + + case PARTITION_HASH: + if (value == null) { + unsetPartition_hash(); + } else { + setPartition_hash((Long) value); + } + break; + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case APP_ID: + return getApp_id(); + + case PARTITION_INDEX: + return getPartition_index(); + + case CLIENT_TIMEOUT: + return getClient_timeout(); + + case PARTITION_HASH: + return getPartition_hash(); + } + throw new IllegalStateException(); + } + + /** + * Returns true if field corresponding to fieldID is set (has been assigned a value) and false + * otherwise + */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case APP_ID: + return isSetApp_id(); + case PARTITION_INDEX: + return isSetPartition_index(); + case CLIENT_TIMEOUT: + return isSetClient_timeout(); + case PARTITION_HASH: + return isSetPartition_hash(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) return false; + if (that instanceof request_meta) return this.equals((request_meta) that); + return false; + } + + public boolean equals(request_meta that) { + if (that == null) return false; + + boolean this_present_app_id = true; + boolean that_present_app_id = true; + if (this_present_app_id || that_present_app_id) { + if (!(this_present_app_id && that_present_app_id)) return false; + if (this.app_id != that.app_id) return false; + } + + boolean this_present_partition_index = true; + boolean that_present_partition_index = true; + if (this_present_partition_index || that_present_partition_index) { + if (!(this_present_partition_index && that_present_partition_index)) return false; + if (this.partition_index != that.partition_index) return false; + } + + boolean this_present_client_timeout = true; + boolean that_present_client_timeout = true; + if (this_present_client_timeout || that_present_client_timeout) { + if (!(this_present_client_timeout && that_present_client_timeout)) return false; + if (this.client_timeout != that.client_timeout) return false; + } + + boolean this_present_partition_hash = true; + boolean that_present_partition_hash = true; + if (this_present_partition_hash || that_present_partition_hash) { + if (!(this_present_partition_hash && that_present_partition_hash)) return false; + if (this.partition_hash != that.partition_hash) return false; + } + + return true; + } + + @Override + public int hashCode() { + List list = new ArrayList(); + + boolean present_app_id = true; + list.add(present_app_id); + if (present_app_id) list.add(app_id); + + boolean present_partition_index = true; + list.add(present_partition_index); + if (present_partition_index) list.add(partition_index); + + boolean present_client_timeout = true; + list.add(present_client_timeout); + if (present_client_timeout) list.add(client_timeout); + + boolean present_partition_hash = true; + list.add(present_partition_hash); + if (present_partition_hash) list.add(partition_hash); + + return list.hashCode(); + } + + @Override + public int compareTo(request_meta other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetApp_id()).compareTo(other.isSetApp_id()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetApp_id()) { + lastComparison = TBaseHelper.compareTo(this.app_id, other.app_id); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = + Boolean.valueOf(isSetPartition_index()).compareTo(other.isSetPartition_index()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPartition_index()) { + lastComparison = TBaseHelper.compareTo(this.partition_index, other.partition_index); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetClient_timeout()).compareTo(other.isSetClient_timeout()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetClient_timeout()) { + lastComparison = TBaseHelper.compareTo(this.client_timeout, other.client_timeout); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetPartition_hash()).compareTo(other.isSetPartition_hash()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetPartition_hash()) { + lastComparison = TBaseHelper.compareTo(this.partition_hash, other.partition_hash); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(TProtocol iprot) throws TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(TProtocol oprot) throws TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("meta("); + boolean first = true; + + sb.append("app_id:"); + sb.append(this.app_id); + first = false; + if (!first) sb.append(", "); + sb.append("partition_index:"); + sb.append(this.partition_index); + first = false; + if (!first) sb.append(", "); + sb.append("client_timeout:"); + sb.append(this.client_timeout); + first = false; + if (!first) sb.append(", "); + sb.append("partition_hash:"); + sb.append(this.partition_hash); + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new TCompactProtocol(new TIOStreamTransport(out))); + } catch (TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) + throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and + // doesn't call the default constructor. + __isset_bitfield = 0; + read(new TCompactProtocol(new TIOStreamTransport(in))); + } catch (TException te) { + throw new java.io.IOException(te); + } + } + + private static class metaStandardSchemeFactory implements SchemeFactory { + public metaStandardScheme getScheme() { + return new metaStandardScheme(); + } + } + + private static class metaStandardScheme extends StandardScheme { + + public void read(TProtocol iprot, request_meta struct) throws TException { + TField schemeField; + iprot.readStructBegin(); + while (true) { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // APP_ID + if (schemeField.type == TType.I32) { + struct.app_id = iprot.readI32(); + struct.setApp_idIsSet(true); + } else { + TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // PARTITION_INDEX + if (schemeField.type == TType.I32) { + struct.partition_index = iprot.readI32(); + struct.setPartition_indexIsSet(true); + } else { + TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // CLIENT_TIMEOUT + if (schemeField.type == TType.I32) { + struct.client_timeout = iprot.readI32(); + struct.setClient_timeoutIsSet(true); + } else { + TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 4: // PARTITION_HASH + if (schemeField.type == TType.I64) { + struct.partition_hash = iprot.readI64(); + struct.setPartition_hashIsSet(true); + } else { + TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(TProtocol oprot, request_meta struct) throws TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(APP_ID_FIELD_DESC); + oprot.writeI32(struct.app_id); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(PARTITION_INDEX_FIELD_DESC); + oprot.writeI32(struct.partition_index); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(CLIENT_TIMEOUT_FIELD_DESC); + oprot.writeI32(struct.client_timeout); + oprot.writeFieldEnd(); + oprot.writeFieldBegin(PARTITION_HASH_FIELD_DESC); + oprot.writeI64(struct.partition_hash); + oprot.writeFieldEnd(); + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + } + + private static class metaTupleSchemeFactory implements SchemeFactory { + public metaTupleScheme getScheme() { + return new metaTupleScheme(); + } + } + + private static class metaTupleScheme extends TupleScheme { + + @Override + public void write(TProtocol prot, request_meta struct) throws TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetApp_id()) { + optionals.set(0); + } + if (struct.isSetPartition_index()) { + optionals.set(1); + } + if (struct.isSetClient_timeout()) { + optionals.set(2); + } + if (struct.isSetPartition_hash()) { + optionals.set(3); + } + oprot.writeBitSet(optionals, 4); + if (struct.isSetApp_id()) { + oprot.writeI32(struct.app_id); + } + if (struct.isSetPartition_index()) { + oprot.writeI32(struct.partition_index); + } + if (struct.isSetClient_timeout()) { + oprot.writeI32(struct.client_timeout); + } + if (struct.isSetPartition_hash()) { + oprot.writeI64(struct.partition_hash); + } + } + + @Override + public void read(TProtocol prot, request_meta struct) throws TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(4); + if (incoming.get(0)) { + struct.app_id = iprot.readI32(); + struct.setApp_idIsSet(true); + } + if (incoming.get(1)) { + struct.partition_index = iprot.readI32(); + struct.setPartition_indexIsSet(true); + } + if (incoming.get(2)) { + struct.client_timeout = iprot.readI32(); + struct.setClient_timeoutIsSet(true); + } + if (incoming.get(3)) { + struct.partition_hash = iprot.readI64(); + struct.setPartition_hashIsSet(true); + } + } + } +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 2b4bc51e..2e115c19 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -5,15 +5,16 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; +import com.xiaomi.infra.pegasus.base.request_meta; import com.xiaomi.infra.pegasus.rpc.ThriftHeader; import com.xiaomi.infra.pegasus.thrift.TException; -import com.xiaomi.infra.pegasus.tools.Tools; public abstract class client_operator { public client_operator(gpid gpid, String tableName) { this.header = new ThriftHeader(); - this.header.app_id = gpid.get_app_id(); - this.header.partition_index = gpid.get_pidx(); + this.meta = new request_meta(); + this.meta.setApp_id(gpid.get_app_id()); + this.meta.setPartition_index(gpid.get_pidx()); this.pid = gpid; this.tableName = tableName; this.rpc_error = new error_code(); @@ -21,17 +22,22 @@ public client_operator(gpid gpid, String tableName) { public client_operator(gpid gpid, String tableName, long partitionHash) { this(gpid, tableName); - this.header.partition_hash = partitionHash; + this.meta.setPartition_hash(partitionHash); } - public final byte[] prepare_thrift_header(int body_length, int client_timeout) { - header.body_length = body_length; - header.header_length = ThriftHeader.HEADER_LENGTH; - header.client_timeout = client_timeout; - header.thread_hash = Tools.dsn_gpid_to_thread_hash(header.app_id, header.partition_index); + public final byte[] prepare_thrift_header(int body_length, int meta_length) { + this.header.body_length = body_length; + this.header.meta_length = meta_length; return header.toByteArray(); } + public final void prepare_thrift_meta( + com.xiaomi.infra.pegasus.thrift.protocol.TProtocol oprot, int client_timeout) + throws TException { + this.meta.setClient_timeout(client_timeout); + this.meta.write(oprot); + } + public String getQPSCounter() { String mark; switch (rpc_error.errno) { @@ -79,6 +85,7 @@ public abstract void recv_data(com.xiaomi.infra.pegasus.thrift.protocol.TProtoco throws TException; public ThriftHeader header; + public request_meta meta; public gpid pid; public String tableName; // only for metrics public error_code rpc_error; diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java index 9ea6611e..f67eae9b 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java @@ -6,32 +6,16 @@ import java.nio.ByteBuffer; public class ThriftHeader { - public static final int HEADER_LENGTH = 48; + public static final int HEADER_LENGTH = 12; static final byte[] HEADER_TYPE = {'T', 'H', 'F', 'T'}; - public int hdr_version = 0; - public int header_length; - public int header_crc32 = 0; public int body_length; - public int body_crc32 = 0; - public int app_id; - public int partition_index; - public int client_timeout = 0; - public int thread_hash; - public long partition_hash; + public int meta_length; public byte[] toByteArray() { ByteBuffer bf = ByteBuffer.allocate(HEADER_LENGTH); bf.put(HEADER_TYPE); - bf.putInt(hdr_version); - bf.putInt(header_length); - bf.putInt(header_crc32); bf.putInt(body_length); - bf.putInt(body_crc32); - bf.putInt(app_id); - bf.putInt(partition_index); - bf.putInt(client_timeout); - bf.putInt(thread_hash); - bf.putLong(partition_hash); + bf.putInt(meta_length); return bf.array(); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java index 26f5319b..bb2b6ef4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java @@ -31,11 +31,19 @@ protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, // write the Memory buffer out.writerIndex(initIndex + ThriftHeader.HEADER_LENGTH); TBinaryProtocol protocol = new TBinaryProtocol(new TByteBufTransport(out)); + + // write meta + e.op.prepare_thrift_meta(protocol, (int) e.timeoutMs); + int meta_length = out.readableBytes() - ThriftHeader.HEADER_LENGTH; + + // write body e.op.send_data(protocol, e.sequenceId); + + // write header out.setBytes( initIndex, e.op.prepare_thrift_header( - out.readableBytes() - ThriftHeader.HEADER_LENGTH, (int) e.timeoutMs)); + out.readableBytes() - ThriftHeader.HEADER_LENGTH - meta_length, meta_length)); } @Override From a2952b40d656c5e3fe68a3b5e006b1a03731d8d9 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Fri, 27 Dec 2019 16:02:42 +0800 Subject: [PATCH 02/15] backup --- .../xiaomi/infra/pegasus/operator/client_operator.java | 4 ++-- .../java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java | 8 +++++--- .../infra/pegasus/rpc/async/ThriftFrameEncoder.java | 6 ++---- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 2e115c19..09d7484f 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -25,9 +25,9 @@ public client_operator(gpid gpid, String tableName, long partitionHash) { this.meta.setPartition_hash(partitionHash); } - public final byte[] prepare_thrift_header(int body_length, int meta_length) { - this.header.body_length = body_length; + public final byte[] prepare_thrift_header(int meta_length, int body_length) { this.header.meta_length = meta_length; + this.header.body_length = body_length; return header.toByteArray(); } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java index f67eae9b..e10ac8fd 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/ThriftHeader.java @@ -6,16 +6,18 @@ import java.nio.ByteBuffer; public class ThriftHeader { - public static final int HEADER_LENGTH = 12; + public static final int HEADER_LENGTH = 16; static final byte[] HEADER_TYPE = {'T', 'H', 'F', 'T'}; - public int body_length; + public int version = 1; public int meta_length; + public int body_length; public byte[] toByteArray() { ByteBuffer bf = ByteBuffer.allocate(HEADER_LENGTH); bf.put(HEADER_TYPE); - bf.putInt(body_length); + bf.putInt(version); bf.putInt(meta_length); + bf.putInt(body_length); return bf.array(); } } diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java index bb2b6ef4..d5046f9c 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java @@ -40,10 +40,8 @@ protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, e.op.send_data(protocol, e.sequenceId); // write header - out.setBytes( - initIndex, - e.op.prepare_thrift_header( - out.readableBytes() - ThriftHeader.HEADER_LENGTH - meta_length, meta_length)); + out.setBytes(initIndex, e.op.prepare_thrift_header(meta_length, + out.readableBytes() - ThriftHeader.HEADER_LENGTH - meta_length)); } @Override From d99b6262629b252cc2ee97d9787efb06163c4900 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Fri, 27 Dec 2019 16:05:20 +0800 Subject: [PATCH 03/15] backup request --- .../xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java index d5046f9c..3345f0fa 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java +++ b/src/main/java/com/xiaomi/infra/pegasus/rpc/async/ThriftFrameEncoder.java @@ -40,8 +40,10 @@ protected void encode(ChannelHandlerContext ctx, ReplicaSession.RequestEntry e, e.op.send_data(protocol, e.sequenceId); // write header - out.setBytes(initIndex, e.op.prepare_thrift_header(meta_length, - out.readableBytes() - ThriftHeader.HEADER_LENGTH - meta_length)); + out.setBytes( + initIndex, + e.op.prepare_thrift_header( + meta_length, out.readableBytes() - ThriftHeader.HEADER_LENGTH - meta_length)); } @Override From 90ff1b4b86b6664df2bd5edd47a28a4eb22be263 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Tue, 31 Dec 2019 14:42:33 +0800 Subject: [PATCH 04/15] fix --- idl/base.thrift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/idl/base.thrift b/idl/base.thrift index 649db19c..d2a6182c 100644 --- a/idl/base.thrift +++ b/idl/base.thrift @@ -28,4 +28,4 @@ struct request_meta { 2:i32 partition_index; 3:i32 client_timeout; 4:i64 partition_hash; -} \ No newline at end of file +} From 4159695d8f99647a0818da628005cf7a8e0a3ea7 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 11:27:27 +0800 Subject: [PATCH 05/15] use snapshot --- scripts/travis.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index bcbdc7ad..8701af00 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,8 +26,8 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release" -PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.6/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz" +PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" # start pegasus onebox environment if [ ! -f $PEGASUS_PKG.tar.gz ]; then From 79a51b7108b8f4a0f7f0d4c3f05be479216b5dcd Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 11:46:23 +0800 Subject: [PATCH 06/15] snapshot --- scripts/travis.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index 8701af00..40feeedf 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,8 +26,9 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" -PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" +PEGASUS_PKG="https://github.com/XiaoMi/pegasus-common/archive/deps.tar.gz" + +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/archive/deps.tar.gz" # start pegasus onebox environment if [ ! -f $PEGASUS_PKG.tar.gz ]; then From be4e5e06de2c414c10de9f844c2d460829b64319 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 12:03:37 +0800 Subject: [PATCH 07/15] use snapshot --- scripts/travis.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index 40feeedf..b311786e 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,7 +26,7 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="https://github.com/XiaoMi/pegasus-common/archive/deps.tar.gz" +PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/archive/deps.tar.gz" From 159632023bd6c9b9b13a1ef314160bf7b0b19170 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 12:08:21 +0800 Subject: [PATCH 08/15] use snapshot --- scripts/travis.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index b311786e..37ddd042 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,7 +26,7 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" +PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release" PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/archive/deps.tar.gz" From 3b4580357f2c9dd9b03060fc3bb92358d6de3632 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 14:18:43 +0800 Subject: [PATCH 09/15] update snapshot --- scripts/travis.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index bcbdc7ad..1778efbb 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,8 +26,8 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release" -PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus/releases/download/v1.11.6/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz" +PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" # start pegasus onebox environment if [ ! -f $PEGASUS_PKG.tar.gz ]; then From 468f21076db090955e766a606e7552f2475633e3 Mon Sep 17 00:00:00 2001 From: levy5307 Date: Wed, 19 Feb 2020 15:45:52 +0800 Subject: [PATCH 10/15] use snapshot --- scripts/travis.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index 1778efbb..467b9c7d 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,8 +26,8 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release" -PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-server-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" +PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz" # start pegasus onebox environment if [ ! -f $PEGASUS_PKG.tar.gz ]; then From 99230f6d48ca2968eb36f9cb92173ffa53f3dda8 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Wed, 19 Feb 2020 18:18:41 +0800 Subject: [PATCH 11/15] temp --- ._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release | Bin 0 -> 222 bytes scripts/travis.sh | 4 +++- 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100755 ._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release diff --git a/._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release b/._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release new file mode 100755 index 0000000000000000000000000000000000000000..e1e30428ce2a33bc41e2d1fd180f22711ff9d368 GIT binary patch literal 222 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDI}@fsio@$UgK5x_AdBnYYuq+J^qI7A5ADWagzZ6zUroSQuKHrkW%t8k(nAyXWVp=cL9|7#TQc v6y@ipS{u4pIvTr~n(CUkIhyF2m^i!WT9~;x=^7fknV7n|8CsZ{nlk_ZF?S Date: Fri, 21 Feb 2020 14:06:07 +0800 Subject: [PATCH 12/15] fix --- scripts/travis.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/travis.sh b/scripts/travis.sh index 467b9c7d..8fd4a8d1 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,8 +26,8 @@ if [[ $(git status -s) ]]; then exit 1 fi -PEGASUS_PKG="pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release" -PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release.tar.gz" +PEGASUS_PKG="pegasus-tools-1.13.SNAPSHOT-695b366-glibc2.17-release" +PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-tools-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" # start pegasus onebox environment if [ ! -f $PEGASUS_PKG.tar.gz ]; then From a99d248a5e2624426492925caeb23b5613a34e4d Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Mon, 24 Feb 2020 16:49:40 +0800 Subject: [PATCH 13/15] refactor --- idl/base.thrift | 7 ------- idl/replication.thrift | 7 +++++++ .../com/xiaomi/infra/pegasus/operator/client_operator.java | 2 +- .../infra/pegasus/{base => replication}/request_meta.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) rename src/main/java/com/xiaomi/infra/pegasus/{base => replication}/request_meta.java (99%) diff --git a/idl/base.thrift b/idl/base.thrift index d2a6182c..7728e6c6 100644 --- a/idl/base.thrift +++ b/idl/base.thrift @@ -22,10 +22,3 @@ struct rpc_address struct gpid { } - -struct request_meta { - 1:i32 app_id; - 2:i32 partition_index; - 3:i32 client_timeout; - 4:i64 partition_hash; -} diff --git a/idl/replication.thrift b/idl/replication.thrift index 29be735e..56af41a3 100644 --- a/idl/replication.thrift +++ b/idl/replication.thrift @@ -31,3 +31,10 @@ struct query_cfg_response 4:bool is_stateful; 5:list partitions; } + +struct request_meta { + 1:i32 app_id; + 2:i32 partition_index; + 3:i32 client_timeout; + 4:i64 partition_hash; +} diff --git a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java index 09d7484f..c325d3c3 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java +++ b/src/main/java/com/xiaomi/infra/pegasus/operator/client_operator.java @@ -5,7 +5,7 @@ import com.xiaomi.infra.pegasus.base.error_code; import com.xiaomi.infra.pegasus.base.gpid; -import com.xiaomi.infra.pegasus.base.request_meta; +import com.xiaomi.infra.pegasus.replication.request_meta; import com.xiaomi.infra.pegasus.rpc.ThriftHeader; import com.xiaomi.infra.pegasus.thrift.TException; diff --git a/src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java b/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java similarity index 99% rename from src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java rename to src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java index a7228d82..6513ebc4 100644 --- a/src/main/java/com/xiaomi/infra/pegasus/base/request_meta.java +++ b/src/main/java/com/xiaomi/infra/pegasus/replication/request_meta.java @@ -5,7 +5,7 @@ * * @generated */ -package com.xiaomi.infra.pegasus.base; +package com.xiaomi.infra.pegasus.replication; import com.xiaomi.infra.pegasus.thrift.*; import com.xiaomi.infra.pegasus.thrift.meta_data.FieldMetaData; From d14abff7fdb84239d9616523a60912312b2bc7a8 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Mon, 24 Feb 2020 16:53:19 +0800 Subject: [PATCH 14/15] add note --- scripts/travis.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/scripts/travis.sh b/scripts/travis.sh index 8fd4a8d1..f2bae5c5 100755 --- a/scripts/travis.sh +++ b/scripts/travis.sh @@ -26,6 +26,8 @@ if [[ $(git status -s) ]]; then exit 1 fi +# The new version of pegasus client is not compatible with old version server which contains old rpc protocol, +# So we use snapshot version of pegasus-tools, because we don`t have a new release version, which contains the new version of rpc protocol, PEGASUS_PKG="pegasus-tools-1.13.SNAPSHOT-695b366-glibc2.17-release" PEGASUS_PKG_URL="https://github.com/XiaoMi/pegasus-common/releases/download/deps/pegasus-tools-1.13.SNAPSHOT-695b366-glibc2.17-release.tar.gz" From 6fb9c856f7aa9c637e150e9ef0a830f644f07041 Mon Sep 17 00:00:00 2001 From: zhaoliwei Date: Fri, 28 Feb 2020 14:09:00 +0800 Subject: [PATCH 15/15] delted unused file --- ._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release | Bin 222 -> 0 bytes 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100755 ._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release diff --git a/._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release b/._pegasus-tools-1.11.6-9f4e5ae-glibc2.12-release deleted file mode 100755 index e1e30428ce2a33bc41e2d1fd180f22711ff9d368..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 222 zcmZQz6=P>$Vqox1Ojhs@R)|o50+1L3ClDI}@fsio@$UgK5x_AdBnYYuq+J^qI7A5ADWagzZ6zUroSQuKHrkW%t8k(nAyXWVp=cL9|7#TQc v6y@ipS{u4pIvTr~n(CUkIhyF2m^i!WT9~;x=^7fknV7n|8CsZ{nlk_ZF?S