Skip to content

Commit 9e81246

Browse files
committed
Merge branch 'master' into sub-iot-v2
2 parents 5d10f4b + 4342166 commit 9e81246

File tree

42 files changed

+2869
-10
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2869
-10
lines changed

‎LICENSE‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,3 +270,18 @@ The following files include code modified from Apache Kafka project.
270270

271271
Project page: https://github.com/apache/kafka
272272
License: https://github.com/apache/kafka/blob/trunk/LICENSE
273+
274+
--------------------------------------------------------------------------------
275+
276+
The following files include code modified from Stream-Lib project.
277+
278+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/Counter.java
279+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/DoublyLinkedList.java
280+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ExternalizableUtil.java
281+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ITopK.java
282+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/ListNode2.java
283+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/Pair.java
284+
./iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/approximate/StreamSummary.java
285+
286+
Project page: https://github.com/addthis/stream-lib
287+
License: https://github.com/addthis/stream-lib/blob/master/LICENSE.txt

‎integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java‎

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4165,6 +4165,51 @@ public void approxCountDistinctTest() {
41654165
DATABASE_NAME);
41664166
}
41674167

4168+
@Test
4169+
public void approxMostFrequentTest() {
4170+
String[] expectedHeader = buildHeaders(7);
4171+
String[] retArray =
4172+
new String[] {
4173+
"{\"50000\":8},{\"30.0\":8},{\"55.0\":8},{\"true\":12},{\"0xcafebabe55\":8},{\"1727158540000\":12},{\"20240924\":20},"
4174+
};
4175+
tableResultSetEqualTest(
4176+
"select approx_most_frequent(s2, 1, 10), approx_most_frequent(s3, 1, 10), approx_most_frequent(s4, 1, 10), approx_most_frequent(s5, 1, 10), approx_most_frequent(s8, 1, 10), approx_most_frequent(s9, 1, 10), approx_most_frequent(s10, 1, 10) from table1",
4177+
expectedHeader,
4178+
retArray,
4179+
DATABASE_NAME);
4180+
4181+
expectedHeader = new String[] {"time", "province", "_col2"};
4182+
retArray =
4183+
new String[] {
4184+
"2024-09-24T06:15:30.000Z,beijing,{},",
4185+
"2024-09-24T06:15:31.000Z,beijing,{\"31000\":2},",
4186+
"2024-09-24T06:15:35.000Z,beijing,{\"35000\":2},",
4187+
"2024-09-24T06:15:36.000Z,beijing,{},",
4188+
"2024-09-24T06:15:40.000Z,beijing,{\"40000\":2},",
4189+
"2024-09-24T06:15:41.000Z,beijing,{},",
4190+
"2024-09-24T06:15:46.000Z,beijing,{\"46000\":2},",
4191+
"2024-09-24T06:15:50.000Z,beijing,{\"50000\":4},",
4192+
"2024-09-24T06:15:51.000Z,beijing,{},",
4193+
"2024-09-24T06:15:55.000Z,beijing,{},",
4194+
"2024-09-24T06:15:30.000Z,shanghai,{},",
4195+
"2024-09-24T06:15:31.000Z,shanghai,{\"31000\":2},",
4196+
"2024-09-24T06:15:35.000Z,shanghai,{\"35000\":2},",
4197+
"2024-09-24T06:15:36.000Z,shanghai,{},",
4198+
"2024-09-24T06:15:40.000Z,shanghai,{\"40000\":2},",
4199+
"2024-09-24T06:15:41.000Z,shanghai,{},",
4200+
"2024-09-24T06:15:46.000Z,shanghai,{\"46000\":2},",
4201+
"2024-09-24T06:15:50.000Z,shanghai,{\"50000\":4},",
4202+
"2024-09-24T06:15:51.000Z,shanghai,{},",
4203+
"2024-09-24T06:15:55.000Z,shanghai,{},",
4204+
};
4205+
4206+
tableResultSetEqualTest(
4207+
"SELECT time,province,approx_most_frequent(s2, 1, 10) from table1 group by 1,2 order by 2,1",
4208+
expectedHeader,
4209+
retArray,
4210+
DATABASE_NAME);
4211+
}
4212+
41684213
@Test
41694214
public void exceptionTest() {
41704215
tableAssertTestFail(
@@ -4211,6 +4256,14 @@ public void exceptionTest() {
42114256
"select approx_count_distinct(province, 'test') from table1",
42124257
"701: Second argument of Aggregate functions [approx_count_distinct] should be numberic type and do not use expression",
42134258
DATABASE_NAME);
4259+
tableAssertTestFail(
4260+
"select approx_most_frequent(province, -10, 100) from table1",
4261+
"701: The second and third argument must be greater than 0, but got k=-10, capacity=100",
4262+
DATABASE_NAME);
4263+
tableAssertTestFail(
4264+
"select approx_most_frequent(province, 'test', 100) from table1",
4265+
"701: The second and third argument of 'approx_most_frequent' function must be numeric literal",
4266+
DATABASE_NAME);
42144267
}
42154268

42164269
// ==================================================================
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
16+
17+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
18+
19+
import com.google.gson.Gson;
20+
import org.apache.tsfile.block.column.ColumnBuilder;
21+
import org.apache.tsfile.file.metadata.statistics.Statistics;
22+
import org.apache.tsfile.utils.Binary;
23+
24+
import java.nio.charset.StandardCharsets;
25+
26+
public abstract class AbstractApproxMostFrequentAccumulator<T> implements TableAccumulator {
27+
28+
protected SpaceSavingStateFactory.SingleSpaceSavingState<T> state =
29+
SpaceSavingStateFactory.createSingleState();
30+
31+
@Override
32+
public void evaluateIntermediate(ColumnBuilder columnBuilder) {
33+
columnBuilder.writeBinary(new Binary(state.getSpaceSaving().serialize()));
34+
}
35+
36+
@Override
37+
public void evaluateFinal(ColumnBuilder columnBuilder) {
38+
columnBuilder.writeBinary(
39+
new Binary(new Gson().toJson(state.getSpaceSaving().getBuckets()), StandardCharsets.UTF_8));
40+
}
41+
42+
@Override
43+
public boolean hasFinalResult() {
44+
return false;
45+
}
46+
47+
@Override
48+
public void addStatistics(Statistics[] statistics) {
49+
throw new UnsupportedOperationException(
50+
"ApproxMostFrequentAccumulator does not support statistics");
51+
}
52+
53+
@Override
54+
public void reset() {
55+
state.getSpaceSaving().reset();
56+
}
57+
}

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java‎

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
import org.apache.iotdb.common.rpc.thrift.TAggregationType;
2323
import org.apache.iotdb.commons.udf.utils.UDFDataTypeTransformer;
2424
import org.apache.iotdb.db.queryengine.execution.aggregation.VarianceAccumulator;
25+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BinaryGroupedApproxMostFrequentAccumulator;
26+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BlobGroupedApproxMostFrequentAccumulator;
27+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.BooleanGroupedApproxMostFrequentAccumulator;
28+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.DoubleGroupedApproxMostFrequentAccumulator;
29+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.FloatGroupedApproxMostFrequentAccumulator;
2530
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAccumulator;
2631
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedApproxCountDistinctAccumulator;
2732
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAvgAccumulator;
@@ -41,6 +46,8 @@
4146
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedSumAccumulator;
4247
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedUserDefinedAggregateAccumulator;
4348
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedVarianceAccumulator;
49+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.IntGroupedApproxMostFrequentAccumulator;
50+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.LongGroupedApproxMostFrequentAccumulator;
4451
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.UpdateMemory;
4552
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.MarkDistinctHash;
4653
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
@@ -57,6 +64,7 @@
5764
import org.apache.tsfile.read.common.block.column.IntColumn;
5865
import org.apache.tsfile.read.common.type.Type;
5966
import org.apache.tsfile.read.common.type.TypeFactory;
67+
import org.apache.tsfile.write.UnSupportedDataTypeException;
6068

6169
import java.util.Arrays;
6270
import java.util.List;
@@ -246,6 +254,8 @@ private static GroupedAccumulator createBuiltinGroupedAccumulator(
246254
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
247255
case APPROX_COUNT_DISTINCT:
248256
return new GroupedApproxCountDistinctAccumulator(inputDataTypes.get(0));
257+
case APPROX_MOST_FREQUENT:
258+
return getGroupedApproxMostFrequentAccumulator(inputDataTypes.get(0));
249259
default:
250260
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
251261
}
@@ -313,11 +323,63 @@ public static TableAccumulator createBuiltinAccumulator(
313323
inputDataTypes.get(0), VarianceAccumulator.VarianceType.VAR_POP);
314324
case APPROX_COUNT_DISTINCT:
315325
return new ApproxCountDistinctAccumulator(inputDataTypes.get(0));
326+
case APPROX_MOST_FREQUENT:
327+
return getApproxMostFrequentAccumulator(inputDataTypes.get(0));
316328
default:
317329
throw new IllegalArgumentException("Invalid Aggregation function: " + aggregationType);
318330
}
319331
}
320332

333+
public static GroupedAccumulator getGroupedApproxMostFrequentAccumulator(TSDataType type) {
334+
switch (type) {
335+
case BOOLEAN:
336+
return new BooleanGroupedApproxMostFrequentAccumulator();
337+
case INT32:
338+
case DATE:
339+
return new IntGroupedApproxMostFrequentAccumulator();
340+
case INT64:
341+
case TIMESTAMP:
342+
return new LongGroupedApproxMostFrequentAccumulator();
343+
case FLOAT:
344+
return new FloatGroupedApproxMostFrequentAccumulator();
345+
case DOUBLE:
346+
return new DoubleGroupedApproxMostFrequentAccumulator();
347+
case TEXT:
348+
case STRING:
349+
return new BinaryGroupedApproxMostFrequentAccumulator();
350+
case BLOB:
351+
return new BlobGroupedApproxMostFrequentAccumulator();
352+
default:
353+
throw new UnSupportedDataTypeException(
354+
String.format("Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: %s", type));
355+
}
356+
}
357+
358+
public static TableAccumulator getApproxMostFrequentAccumulator(TSDataType type) {
359+
switch (type) {
360+
case BOOLEAN:
361+
return new BooleanApproxMostFrequentAccumulator();
362+
case INT32:
363+
case DATE:
364+
return new IntApproxMostFrequentAccumulator();
365+
case INT64:
366+
case TIMESTAMP:
367+
return new LongApproxMostFrequentAccumulator();
368+
case FLOAT:
369+
return new FloatApproxMostFrequentAccumulator();
370+
case DOUBLE:
371+
return new DoubleApproxMostFrequentAccumulator();
372+
case TEXT:
373+
case STRING:
374+
return new BinaryApproxMostFrequentAccumulator();
375+
case BLOB:
376+
return new BlobApproxMostFrequentAccumulator();
377+
default:
378+
throw new UnSupportedDataTypeException(
379+
String.format("Unsupported data type in APPROX_COUNT_DISTINCT Aggregation: %s", type));
380+
}
381+
}
382+
321383
public static boolean isMultiInputAggregation(TAggregationType aggregationType) {
322384
switch (aggregationType) {
323385
case MAX_BY:

‎iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/ApproxCountDistinctAccumulator.java‎

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
1616

17+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog;
18+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLogStateFactory;
19+
1720
import org.apache.tsfile.block.column.Column;
1821
import org.apache.tsfile.block.column.ColumnBuilder;
1922
import org.apache.tsfile.enums.TSDataType;
@@ -24,7 +27,7 @@
2427
import org.apache.tsfile.write.UnSupportedDataTypeException;
2528

2629
import static com.google.common.base.Preconditions.checkArgument;
27-
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.HyperLogLog.DEFAULT_STANDARD_ERROR;
30+
import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.HyperLogLog.DEFAULT_STANDARD_ERROR;
2831

2932
public class ApproxCountDistinctAccumulator implements TableAccumulator {
3033
private static final long INSTANCE_SIZE =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
15+
package org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation;
16+
17+
import org.apache.iotdb.db.exception.sql.SemanticException;
18+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.Counter;
19+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSaving;
20+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.approximate.SpaceSavingStateFactory;
21+
22+
import org.apache.tsfile.block.column.Column;
23+
import org.apache.tsfile.utils.Binary;
24+
import org.apache.tsfile.utils.RamUsageEstimator;
25+
import org.apache.tsfile.utils.ReadWriteIOUtils;
26+
27+
import java.nio.ByteBuffer;
28+
import java.util.List;
29+
30+
public class BinaryApproxMostFrequentAccumulator
31+
extends AbstractApproxMostFrequentAccumulator<Binary> {
32+
33+
private static final long INSTANCE_SIZE =
34+
RamUsageEstimator.shallowSizeOfInstance(BinaryApproxMostFrequentAccumulator.class);
35+
36+
public BinaryApproxMostFrequentAccumulator() {}
37+
38+
@Override
39+
public long getEstimatedSize() {
40+
return INSTANCE_SIZE
41+
+ RamUsageEstimator.shallowSizeOfInstance(BinaryApproxMostFrequentAccumulator.class)
42+
+ state.getEstimatedSize();
43+
}
44+
45+
@Override
46+
public TableAccumulator copy() {
47+
return new BinaryApproxMostFrequentAccumulator();
48+
}
49+
50+
@Override
51+
public void addInput(Column[] arguments, AggregationMask mask) {
52+
int maxBuckets = arguments[1].getInt(0);
53+
int capacity = arguments[2].getInt(0);
54+
if (maxBuckets <= 0 || capacity <= 0) {
55+
throw new SemanticException(
56+
"The second and third argument must be greater than 0, but got k="
57+
+ maxBuckets
58+
+ ", capacity="
59+
+ capacity);
60+
}
61+
SpaceSaving<Binary> spaceSaving = getOrCreateSpaceSaving(state, maxBuckets, capacity);
62+
63+
int positionCount = mask.getPositionCount();
64+
65+
Column valueColumn = arguments[0];
66+
67+
if (mask.isSelectAll()) {
68+
for (int i = 0; i < valueColumn.getPositionCount(); i++) {
69+
if (!valueColumn.isNull(i)) {
70+
spaceSaving.add(valueColumn.getBinary(i));
71+
}
72+
}
73+
} else {
74+
int[] selectedPositions = mask.getSelectedPositions();
75+
int position;
76+
for (int i = 0; i < positionCount; i++) {
77+
position = selectedPositions[i];
78+
if (!valueColumn.isNull(position)) {
79+
spaceSaving.add(valueColumn.getBinary(position));
80+
}
81+
}
82+
}
83+
}
84+
85+
@Override
86+
public void addIntermediate(Column argument) {
87+
for (int i = 0; i < argument.getPositionCount(); i++) {
88+
if (!argument.isNull(i)) {
89+
SpaceSaving<Binary> current =
90+
new SpaceSaving<>(
91+
argument.getBinary(i).getValues(),
92+
BinaryApproxMostFrequentAccumulator::serializeBucket,
93+
BinaryApproxMostFrequentAccumulator::deserializeBucket,
94+
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
95+
state.merge(current);
96+
}
97+
}
98+
}
99+
100+
public static SpaceSaving<Binary> getOrCreateSpaceSaving(
101+
SpaceSavingStateFactory.SingleSpaceSavingState<Binary> state, int maxBuckets, int capacity) {
102+
SpaceSaving<Binary> spaceSaving = state.getSpaceSaving();
103+
if (spaceSaving == null) {
104+
spaceSaving =
105+
new SpaceSaving<>(
106+
maxBuckets,
107+
capacity,
108+
BinaryApproxMostFrequentAccumulator::serializeBucket,
109+
BinaryApproxMostFrequentAccumulator::deserializeBucket,
110+
BinaryApproxMostFrequentAccumulator::calculateKeyByte);
111+
state.setSpaceSaving(spaceSaving);
112+
}
113+
return spaceSaving;
114+
}
115+
116+
public static void serializeBucket(Binary key, long count, ByteBuffer output) {
117+
ReadWriteIOUtils.write(key, output);
118+
ReadWriteIOUtils.write(count, output);
119+
}
120+
121+
public static void deserializeBucket(ByteBuffer input, SpaceSaving<Binary> spaceSaving) {
122+
Binary key = ReadWriteIOUtils.readBinary(input);
123+
long count = ReadWriteIOUtils.readLong(input);
124+
spaceSaving.add(key, count);
125+
}
126+
127+
public static int calculateKeyByte(List<Counter<Binary>> counters) {
128+
return counters.stream().mapToInt(counter -> counter.getItem().getLength()).sum();
129+
}
130+
}

0 commit comments

Comments
 (0)