Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tsfile java interfaces & Subscription: The usage of bitmap in SubscriptionSessionDataSet may cause npe #14124

Merged
merged 19 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void customize(
public void process(
final TabletInsertionEvent tabletInsertionEvent, final EventCollector eventCollector) {
tabletInsertionEvent.processTablet(
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.rowSize));
(tablet, rowCollector) -> writePointCount.addAndGet(tablet.getRowSize()));
}

@Override
Expand All @@ -71,7 +71,6 @@ public void process(final Event event, final EventCollector eventCollector) thro
Collections.singletonList(
new MeasurementSchema(aggregateSeries.getMeasurement(), TSDataType.INT64)),
1);
tablet.rowSize = 1;
tablet.addTimestamp(0, System.currentTimeMillis());
tablet.addValue(aggregateSeries.getMeasurement(), 0, writePointCount.get());
eventCollector.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,20 +309,21 @@ private static void insertTabletWithAlignedTimeseriesMethod1()
long timestamp = 1;

for (long row = 1; row < 100; row++) {
int rowIndex = tablet.rowSize++;
int rowIndex = tablet.getRowSize();
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue(
schemaList.get(0).getMeasurementId(), rowIndex, new SecureRandom().nextLong());
tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, new SecureRandom().nextInt());
schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextLong());
tablet.addValue(
schemaList.get(1).getMeasurementName(), rowIndex, new SecureRandom().nextInt());

if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
}
timestamp++;
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertAlignedTablet(tablet);
tablet.reset();
}
Expand All @@ -344,22 +345,22 @@ private static void insertTabletWithAlignedTimeseriesMethod2()
Object[] values = tablet.values;

for (long time = 100; time < 200; time++) {
int row = tablet.rowSize++;
timestamps[row] = time;
int row = tablet.getRowSize();
tablet.addTimestamp(row, time);

long[] sensor1 = (long[]) values[0];
sensor1[row] = new SecureRandom().nextLong();

int[] sensor2 = (int[]) values[1];
sensor2[row] = new SecureRandom().nextInt();

if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
}
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
}
Expand All @@ -385,8 +386,8 @@ private static void insertNullableTabletWithAlignedTimeseries()

bitMaps[1] = new BitMap(tablet.getMaxRowNumber());
for (long time = 200; time < 300; time++) {
int row = tablet.rowSize++;
timestamps[row] = time;
int row = tablet.getRowSize();
tablet.addTimestamp(row, time);

long[] sensor1 = (long[]) values[0];
sensor1[row] = new SecureRandom().nextLong();
Expand All @@ -399,14 +400,14 @@ private static void insertNullableTabletWithAlignedTimeseries()
bitMaps[1].mark(row);
}

if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
bitMaps[1].reset();
}
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
}
Expand Down Expand Up @@ -567,19 +568,19 @@ private static void insertTabletsWithAlignedTimeseries()
// Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
int row1 = tablet1.getRowSize();
int row2 = tablet2.getRowSize();
int row3 = tablet3.getRowSize();
tablet1.addTimestamp(row1, timestamp);
tablet2.addTimestamp(row2, timestamp);
tablet3.addTimestamp(row3, timestamp);
for (int i = 0; i < 2; i++) {
long value = new SecureRandom().nextLong();
tablet1.addValue(schemaList1.get(i).getMeasurementId(), row1, value);
tablet2.addValue(schemaList2.get(i).getMeasurementId(), row2, value);
tablet3.addValue(schemaList3.get(i).getMeasurementId(), row3, value);
tablet1.addValue(schemaList1.get(i).getMeasurementName(), row1, value);
tablet2.addValue(schemaList2.get(i).getMeasurementName(), row2, value);
tablet3.addValue(schemaList3.get(i).getMeasurementName(), row3, value);
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
if (tablet1.getRowSize() == tablet1.getMaxRowNumber()) {
session.insertAlignedTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
Expand All @@ -588,7 +589,7 @@ private static void insertTabletsWithAlignedTimeseries()
timestamp++;
}

if (tablet1.rowSize != 0) {
if (tablet1.getRowSize() != 0) {
session.insertAlignedTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
Expand All @@ -604,12 +605,12 @@ private static void insertTabletsWithAlignedTimeseries()
Object[] values3 = tablet3.values;

for (long time = 0; time < 100; time++) {
int row1 = tablet1.rowSize++;
int row2 = tablet2.rowSize++;
int row3 = tablet3.rowSize++;
timestamps1[row1] = time;
timestamps2[row2] = time;
timestamps3[row3] = time;
int row1 = tablet1.getRowSize();
int row2 = tablet2.getRowSize();
int row3 = tablet3.getRowSize();
tablet1.addTimestamp(row1, time);
tablet2.addTimestamp(row2, time);
tablet3.addTimestamp(row3, time);
for (int i = 0; i < 2; i++) {
long[] sensor1 = (long[]) values1[i];
sensor1[row1] = i;
Expand All @@ -618,7 +619,7 @@ private static void insertTabletsWithAlignedTimeseries()
long[] sensor3 = (long[]) values3[i];
sensor3[row3] = i;
}
if (tablet1.rowSize == tablet1.getMaxRowNumber()) {
if (tablet1.getRowSize() == tablet1.getMaxRowNumber()) {
session.insertAlignedTablets(tabletMap, true);

tablet1.reset();
Expand All @@ -627,7 +628,7 @@ private static void insertTabletsWithAlignedTimeseries()
}
}

if (tablet1.rowSize != 0) {
if (tablet1.getRowSize() != 0) {
session.insertAlignedTablets(tabletMap, true);
tablet1.reset();
tablet2.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,53 +141,55 @@ public Void call() {
}
tablet = new Tablet(device, schemaList, 300000);
while (dataIter.next()) {
int row = tablet.rowSize++;
tablet.timestamps[row] = dataIter.getLong(1);
int row = tablet.getRowSize();
tablet.addTimestamp(row, dataIter.getLong(1));
for (int j = 0; j < schemaList.size(); ++j) {
if (dataIter.isNull(j + 2)) {
tablet.addValue(schemaList.get(j).getMeasurementId(), row, null);
tablet.addValue(schemaList.get(j).getMeasurementName(), row, null);
continue;
}
switch (schemaList.get(j).getType()) {
case BOOLEAN:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getBoolean(j + 2));
schemaList.get(j).getMeasurementName(), row, dataIter.getBoolean(j + 2));
break;
case INT32:
tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getInt(j + 2));
tablet.addValue(
schemaList.get(j).getMeasurementName(), row, dataIter.getInt(j + 2));
break;
case INT64:
case TIMESTAMP:
tablet.addValue(schemaList.get(j).getMeasurementId(), row, dataIter.getLong(j + 2));
tablet.addValue(
schemaList.get(j).getMeasurementName(), row, dataIter.getLong(j + 2));
break;
case FLOAT:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getFloat(j + 2));
schemaList.get(j).getMeasurementName(), row, dataIter.getFloat(j + 2));
break;
case DOUBLE:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getDouble(j + 2));
schemaList.get(j).getMeasurementName(), row, dataIter.getDouble(j + 2));
break;
case TEXT:
case STRING:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getString(j + 2));
schemaList.get(j).getMeasurementName(), row, dataIter.getString(j + 2));
break;
case DATE:
case BLOB:
tablet.addValue(
schemaList.get(j).getMeasurementId(), row, dataIter.getObject(j + 2));
schemaList.get(j).getMeasurementName(), row, dataIter.getObject(j + 2));
break;
default:
LOGGER.info("Migration of this type of data is not supported");
}
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
writerPool.insertTablet(tablet, true);
tablet.reset();
}
}
if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
writerPool.insertTablet(tablet);
tablet.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,19 @@ private static void insertTabletWithAlignedTimeseriesMethod(int minTime, int max
long timestamp = minTime;

for (long row = minTime; row < maxTime; row++) {
int rowIndex = tablet.rowSize++;
int rowIndex = tablet.getRowSize();
tablet.addTimestamp(rowIndex, timestamp);
tablet.addValue(schemaList.get(0).getMeasurementId(), rowIndex, row * 10 + 1L);
tablet.addValue(schemaList.get(1).getMeasurementId(), rowIndex, (int) (row * 10 + 2));
tablet.addValue(schemaList.get(0).getMeasurementName(), rowIndex, row * 10 + 1L);
tablet.addValue(schemaList.get(1).getMeasurementName(), rowIndex, (int) (row * 10 + 2));

if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertAlignedTablet(tablet, true);
tablet.reset();
}
timestamp++;
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertAlignedTablet(tablet);
tablet.reset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,20 +154,20 @@ private static void insertTablet(Session session, String deviceId)
// Method 1 to add tablet data
long timestamp = System.currentTimeMillis();
for (long row = 0; row < 100; row++) {
int rowIndex = tablet.rowSize++;
int rowIndex = tablet.getRowSize();
tablet.addTimestamp(rowIndex, timestamp);
for (int s = 0; s < 3; s++) {
long value = random.nextLong();
tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, value);
tablet.addValue(schemaList.get(s).getMeasurementName(), rowIndex, value);
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
timestamp++;
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertTablet(tablet);
tablet.reset();
}
Expand All @@ -177,19 +177,19 @@ private static void insertTablet(Session session, String deviceId)
Object[] values = tablet.values;

for (long time = 0; time < 100; time++) {
int row = tablet.rowSize++;
timestamps[row] = time;
int row = tablet.getRowSize();
tablet.addTimestamp(row, time);
for (int i = 0; i < 3; i++) {
long[] sensor = (long[]) values[i];
sensor[row] = i;
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
session.insertTablet(tablet, true);
tablet.reset();
}
}

if (tablet.rowSize != 0) {
if (tablet.getRowSize() != 0) {
session.insertTablet(tablet);
tablet.reset();
}
Expand Down
Loading
Loading