From c9a4302971d26a478ba4c50cd424fb62670dc7d2 Mon Sep 17 00:00:00 2001 From: Evgeniy Cheban Date: Tue, 21 Apr 2026 14:53:03 +0300 Subject: [PATCH] Support Query Expressions Closes gh-129 --- .github/workflows/pr-build-workflow.yml | 15 +- README.md | 66 +++++ .../java/ru/rt/restream/reindexer/Query.java | 158 +++++++---- .../restream/reindexer/QueryLogBuilder.java | 17 +- .../ru/rt/restream/reindexer/TimeUnit.java | 71 +++++ .../reindexer/binding/cproto/ByteBuffer.java | 53 ++++ .../binding/cproto/PhysicalConnection.java | 19 +- .../reindexer/expression/Expression.java | 116 ++++++++ .../reindexer/expression/FieldExpression.java | 53 ++++ .../expression/FlatArrayLengthFunction.java | 53 ++++ .../expression/FunctionExpression.java | 74 +++++ .../expression/LiteralExpression.java | 88 ++++++ .../reindexer/expression/NowFunction.java | 59 ++++ .../reindexer/expression/SetExpression.java | 32 +++ .../expression/StringExpression.java | 52 ++++ .../expression/SubQueryExpression.java | 54 ++++ .../reindexer/expression/WhereExpression.java | 32 +++ .../reindexer/expression/package-info.java | 19 ++ .../rt/restream/reindexer/TimeUnitTest.java | 57 ++++ .../reindexer/connector/ReindexerTest.java | 256 ++++++++++++++++++ .../reindexer/connector/SubQueryTest.java | 111 ++++++++ 21 files changed, 1374 insertions(+), 81 deletions(-) create mode 100644 src/main/java/ru/rt/restream/reindexer/TimeUnit.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/Expression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/FieldExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/FlatArrayLengthFunction.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/FunctionExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/LiteralExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/NowFunction.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/SetExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/StringExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/SubQueryExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/WhereExpression.java create mode 100644 src/main/java/ru/rt/restream/reindexer/expression/package-info.java create mode 100644 src/test/java/ru/rt/restream/reindexer/TimeUnitTest.java diff --git a/.github/workflows/pr-build-workflow.yml b/.github/workflows/pr-build-workflow.yml index 3269166..076c834 100644 --- a/.github/workflows/pr-build-workflow.yml +++ b/.github/workflows/pr-build-workflow.yml @@ -14,10 +14,17 @@ jobs: with: java-version: 1.11 - name: Install Reindexer + # TODO: Temporarily build reindexer from sources until https://github.com/Restream/reindexer/issues/92 is released. + # run: | + # sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc + # echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list + # sudo apt-get update + # sudo apt-get install -y reindexer-dev libopenblas-pthread-dev run: | - sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc - echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list - sudo apt-get update - sudo apt-get install -y reindexer-dev libopenblas-pthread-dev + curl -L https://github.com/Restream/reindexer/raw/master/dependencies.sh | sudo bash -s + git clone https://github.com/Restream/reindexer + cmake reindexer -Breindexer/build + cmake --build reindexer/build -- -j 4 + sudo cmake --install reindexer/build - name: Build with Maven run: mvn --batch-mode --update-snapshots verify diff --git a/README.md b/README.md index 029801d..e2377f5 100644 --- a/README.md +++ b/README.md @@ -305,6 +305,72 @@ InnerJoins can be used as a condition in Where clause: ``` Note that usually Or operator implements short-circuiting for Where conditions: if the previous condition is true the next one is not evaluated. But in case of InnerJoin it works differently: in query1 (from the example above) both InnerJoin conditions are evaluated despite the result of WhereInt. Limit(0) as part of InnerJoin (query3 from the example above) does not join any data - it works like a filter only to verify conditions. +### Query Expressions + +#### Functions +Reindexer provides built-in functions that can be used within `WHERE` clauses to enable advanced filtering capabilities beyond simple field comparisons. + +##### flat_array_len(field_name) +The `flat_array_len` function returns the length or cardinality of a specified field, making it particularly useful for filtering based on array sizes or field presence. +The `flat_array_len` function can be used in both `SELECT` and `UPDATE` queries. + +Behavior by Field Type: +- Array Fields: returns the number of elements in the array +- Scalar Fields (integers, strings, etc.): always returns 1 +- Object Fields: always returns 1 +- Nested Array Elements: returns the count of occurrences when the field is nested within arrays + +Examples: + +```java +// Find social media posts with between 10 and 50 comments +// and at least 3 attached media files. +List posts = db.query("posts", Post.class) + .where(Expression.flatArrayLength("comments"), RANGE, Expression.values(10, 50)) + .where(Expression.flatArrayLength("media"), GE, Expression.values(3)) + .toList(); + +// Update field 'size' with flat_array_len function. +db.query("posts", Post.class) + .where("id", EQ, 1) + .setExpression("size", Expression.string("flat_array_len(comments)")) + .update(); +``` + +Notes: +- `flat_array_len` function operates efficiently on indexed fields +- Returns 0 if the specified field does not exist in a document +- Supports the following comparison operators: (`=`, `>`, `>=`, `<`, `<=`, `Range`, `Set`) +- Can be used in both `SELECT` and `UPDATE` queries + +##### now(unit) +The `now()` function returns the current system timestamp, making it particularly useful for time-based filtering and data synchronization. +This function can be used in both `SELECT` and `UPDATE` queries. + +Arguments: +- `sec` - returns timestamp in seconds (default if no argument is provided) +- `msec` - returns timestamp in milliseconds +- `usec` - returns timestamp in microseconds +- `nsec` - returns timestamp in nanoseconds + +```java +// Find events that occurred in the past. +List events = db.query("events", Event.class) + .where(Expression.field("timestamp"), LE, Expression.now(TimeUnit.SECONDS)) + .toList(); + +db.query("items", Item.class) + .where("id", EQ, 42) + .setExpression("updated_at", Expression.string("now(usec)")) + .update(); +``` + +Notes: +- The returned timestamp represents seconds (or subunits) since the Unix epoch (January 1, 1970) +- Time resolution depends on the specified unit - use `nsec` for maximum precision +- All instances of `now()` within a single query share the same value, which is computed at the start of the query execution +- Useful for implementing TTL (Time-To-Live) functionality and audit logging + ### Transactions and batch update Reindexer supports transactions. Transaction are performs atomic namespace update. There are synchronous and diff --git a/src/main/java/ru/rt/restream/reindexer/Query.java b/src/main/java/ru/rt/restream/reindexer/Query.java index 73ffa1a..66311f2 100644 --- a/src/main/java/ru/rt/restream/reindexer/Query.java +++ b/src/main/java/ru/rt/restream/reindexer/Query.java @@ -20,12 +20,14 @@ import ru.rt.restream.reindexer.annotations.Hnsw; import ru.rt.restream.reindexer.annotations.Ivf; import ru.rt.restream.reindexer.annotations.VecBf; -import ru.rt.restream.reindexer.binding.Consts; import ru.rt.restream.reindexer.binding.QueryResult; import ru.rt.restream.reindexer.binding.RequestContext; import ru.rt.restream.reindexer.binding.TransactionContext; import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; import ru.rt.restream.reindexer.binding.cproto.cjson.PayloadType; +import ru.rt.restream.reindexer.expression.WhereExpression; +import ru.rt.restream.reindexer.expression.Expression; +import ru.rt.restream.reindexer.expression.SetExpression; import ru.rt.restream.reindexer.util.JsonSerializer; import ru.rt.restream.reindexer.util.Pair; import ru.rt.restream.reindexer.vector.params.KnnSearchParam; @@ -36,10 +38,10 @@ import java.util.Collections; import java.util.Deque; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.Spliterator; import java.util.Spliterators; -import java.util.UUID; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -56,8 +58,6 @@ import static ru.rt.restream.reindexer.binding.Consts.MERGE; import static ru.rt.restream.reindexer.binding.Consts.MODE_ACCURATE_TOTAL; import static ru.rt.restream.reindexer.binding.Consts.OR_INNER_JOIN; -import static ru.rt.restream.reindexer.binding.Consts.VALUE_BOOL; -import static ru.rt.restream.reindexer.binding.Consts.VALUE_NULL; import static ru.rt.restream.reindexer.binding.Consts.VALUE_STRING; /** @@ -112,6 +112,7 @@ public class Query { private static final int QUERY_FIELD_SUB_QUERY_CONDITION = 30; private static final int QUERY_LOCAL = 31; private static final int QUERY_KNN_CONDITION = 32; + private static final int QUERY_EXPRESSION_CONDITION = 36; /** * Condition types. @@ -403,7 +404,7 @@ public Query where(String indexName, Condition condition, Object... values) { buffer.putVarUInt32(values.length); for (Object key : values) { - putValue(key); + buffer.putValue(key); } return this; @@ -444,7 +445,7 @@ public Query where(Query subquery, Condition condition, Object... values) buffer.putVarUInt32(values.length); for (Object key : values) { - putValue(key); + buffer.putValue(key); } return this; @@ -472,6 +473,51 @@ public Query where(String indexName, Condition condition, Query subquery) return this; } + /** + * Where predicate between {@code left} and {@code right} expressions using {@code condition}. + * Supported combinations: + *
    + *
  • {@link Expression#field(String) field} condition {@link Expression#values(Object...) values}
  • + *
  • {@link Expression#field(String) field} condition {@link Expression#now(TimeUnit) now(unit)}
  • + *
  • {@link Expression#field(String) field} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}
  • + *
  • {@link Expression#field(String) field} condition {@link Expression#subQuery(Query) subquery}
  • + *
  • {@link Expression#field(String) field} condition {@link Expression#field(String) field}
  • + *
  • {@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#values(Object...) values}
  • + *
  • {@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#subQuery(Query) subquery}
  • + *
  • {@link Expression#subQuery(Query) subquery} condition {@link Expression#values(Object...) values}
  • + *
  • {@link Expression#subQuery(Query) subquery} condition {@link Expression#now(TimeUnit) now(unit)}
  • + *
  • {@link Expression#subQuery(Query) subquery} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}
  • + *
+ * Example: {@code where(Expression.field("created_at"), Condition.LE, Expression.now())}. + *

+ * + * @param left the left operand {@link Expression} to use + * @param condition the {@link Condition} to use + * @param right the right operand {@link Expression} to use + * @return the {@link Query} for further customizations + * @see Expression for factory methods to build different types of expressions + */ + public Query where(WhereExpression left, Condition condition, WhereExpression right) { + Objects.requireNonNull(left, "left expression cannot be null"); + Objects.requireNonNull(right, "right expression cannot be null"); + + logBuilder.where(nextOperation, left, condition.code, right); + + buffer.putVarUInt32(QUERY_EXPRESSION_CONDITION); + + left.serializeWhere(buffer); + + buffer.putVarUInt32(nextOperation); + buffer.putVarUInt32(condition.code); + + right.serializeWhere(buffer); + + nextOperation = OP_AND; + queryCount++; + + return this; + } + /** * The condition are possible only on the vector indexed fields, * marked with {@link Hnsw}, {@link Ivf}, {@link VecBf} annotations. @@ -785,7 +831,7 @@ public Query sort(String index, boolean desc, Object... values) { buffer.putVarUInt32(values.length); for (Object value : values) { - putValue(value); + buffer.putValue(value); } return this; @@ -803,55 +849,6 @@ public Query fetchCount(int fetchCount) { return this; } - private void putValue(Object value) { - if (value == null) { - buffer.putVarUInt32(VALUE_NULL); - } else if (value instanceof Boolean) { - buffer.putVarUInt32(VALUE_BOOL); - if ((Boolean) value) { - buffer.putVarUInt32(1); - } else { - buffer.putVarUInt32(0); - } - } else if (value instanceof Integer) { - buffer.putVarUInt32(Consts.VALUE_INT) - .putVarInt64((Integer) value); - } else if (value instanceof String) { - buffer.putVarUInt32(Consts.VALUE_STRING) - .putVString((String) value); - } else if (value instanceof Long) { - buffer.putVarUInt32(Consts.VALUE_INT_64) - .putVarInt64((Long) value); - } else if (value instanceof Byte) { - buffer.putVarUInt32(Consts.VALUE_INT) - .putVarInt64((Byte) value); - } else if (value instanceof Short) { - buffer.putVarUInt32(Consts.VALUE_INT) - .putVarInt64((Short) value); - } else if (value instanceof Double) { - buffer.putVarUInt32(Consts.VALUE_DOUBLE) - .putDouble((Double) value); - } else if (value instanceof Float) { - Float floatValue = (Float) value; - buffer.putVarUInt32(Consts.VALUE_DOUBLE) - .putDouble(floatValue.doubleValue()); - } else if (value instanceof Character) { - Character character = (Character) value; - buffer.putVarUInt32(Consts.VALUE_STRING) - .putVString(character.toString()); - } else if (value instanceof UUID) { - buffer.putVarUInt32(Consts.VALUE_UUID) - .putUuid((UUID) value); - } else if (value instanceof Object[]) { - buffer.putVarUInt32(Consts.VALUE_TUPLE); - Object[] objects = (Object[]) value; - buffer.putVarUInt32(objects.length); - for (Object object : objects) { - putValue(object); - } - } - } - /** * Will execute query, and return stream of items. * The returned stream must be closed using the {@link Stream#close()} method or @@ -1122,7 +1119,7 @@ public Query set(String fieldName, Object value) { buffer.putVarUInt32(values.size()); for (Object v : values) { buffer.putVarUInt32(0); - putValue(v); + buffer.putValue(v); } } else if (value != null && value.getClass().isArray()) { Object[] values = (Object[]) value; @@ -1137,19 +1134,43 @@ public Query set(String fieldName, Object value) { buffer.putVarUInt32(values.length); for (Object v : values) { buffer.putVarUInt32(0); - putValue(v); + buffer.putValue(v); } } else { buffer.putVarUInt32(cmd); buffer.putVString(fieldName); buffer.putVarUInt32(1); buffer.putVarUInt32(0); - putValue(value); + buffer.putValue(value); } return this; } + /** + * Updates indexed field by {@link SetExpression}. + *

+ * Example: {@code setExpression("created_at", Expression.string("now() - 1 * 24 * 60 * 60"))}. + *

+ * + * @param fieldName the field name to use + * @param expression the expression to use + * @return the {@link Query} for further customizations + * @see Expression for factory methods to build different types of expressions + */ + public Query setExpression(String fieldName, SetExpression expression) { + Objects.requireNonNull(expression, "expression cannot be null"); + + logBuilder.set(fieldName, expression); + + buffer.putVarUInt32(QUERY_UPDATE_FIELD); + buffer.putVString(fieldName); + + expression.serializeSet(buffer); + + return this; + } + private void setObject(String fieldName, Object value) { boolean isArray = false; int count = 1; @@ -1286,4 +1307,23 @@ String getSql() { return logBuilder.getSql(); } + /** + * Returns all used bytes from the {@link ByteBuffer}. + * + * @return all used bytes from the {@code ByteBuffer} + */ + public byte[] bytes() { + return buffer.bytes(); + } + + /** + * Returns the string representation of the query. + * + * @return the string representation of the query + */ + @Override + public String toString() { + return getSql(); + } + } diff --git a/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java b/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java index 7338dc5..5ebabdd 100644 --- a/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java +++ b/src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java @@ -16,6 +16,7 @@ package ru.rt.restream.reindexer; import org.apache.commons.lang3.StringUtils; +import ru.rt.restream.reindexer.expression.Expression; import ru.rt.restream.reindexer.vector.params.KnnSearchParam; import java.util.ArrayDeque; @@ -59,7 +60,7 @@ private static class AggregateEntry { private static class QueryEntry { private Operation operation; - private String field; + private Object field; private Condition condition; private String secondField; private int joinIndex = -1; @@ -299,6 +300,20 @@ void where(int operationCode, String field, int conditionCode, Object... values) } } + void where(int operationCode, Expression left, int conditionCode, Expression right) { + QueryEntry queryEntry = new QueryEntry(); + queryEntry.operation = getOperation(operationCode); + queryEntry.field = left; + queryEntry.condition = getCondition(conditionCode); + queryEntry.values.add(right); + if (!whereStack.isEmpty()) { + QueryEntry parent = whereStack.getLast(); + parent.children.add(queryEntry); + } else { + whereEntries.add(queryEntry); + } + } + void whereKnn(int operationCode, String indexName, float[] vector, KnnSearchParam params) { QueryEntry queryEntry = new QueryEntry(); queryEntry.operation = getOperation(operationCode); diff --git a/src/main/java/ru/rt/restream/reindexer/TimeUnit.java b/src/main/java/ru/rt/restream/reindexer/TimeUnit.java new file mode 100644 index 0000000..93eee62 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/TimeUnit.java @@ -0,0 +1,71 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer; + +import lombok.Getter; + +import java.util.Objects; + +/** + * Represents a time unit to use in date functions. + */ +public enum TimeUnit { + + /** + * Nanoseconds time unit. + */ + NANOS("nsec"), + + /** + * Microseconds time unit. + */ + MICROS("usec"), + + /** + * Milliseconds time unit. + */ + MILLIS("msec"), + + /** + * Seconds time unit. + */ + SECONDS("sec"), + ; + + /** + * Returns the {@code TimeUnit} for the given {@code name}. + * + * @param name the time unit name + * @return the {@link TimeUnit} to use + */ + public static TimeUnit fromName(String name) { + Objects.requireNonNull(name, "name cannot be null"); + for (TimeUnit timeUnit : TimeUnit.values()) { + if (timeUnit.name.equalsIgnoreCase(name)) { + return timeUnit; + } + } + throw new IllegalArgumentException("Unsupported TimeUnit name: " + name); + } + + @Getter + private final String name; + + TimeUnit(String name) { + this.name = name; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/binding/cproto/ByteBuffer.java b/src/main/java/ru/rt/restream/reindexer/binding/cproto/ByteBuffer.java index 95743a2..edeb571 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/cproto/ByteBuffer.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/cproto/ByteBuffer.java @@ -15,8 +15,12 @@ */ package ru.rt.restream.reindexer.binding.cproto; +import ru.rt.restream.reindexer.binding.Consts; + +import java.lang.reflect.Array; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collection; import java.util.UUID; /** @@ -277,6 +281,55 @@ public ByteBuffer putVBytes(byte[] value) { return this; } + /** + * Encodes an object value. Inserts array or collection length and + * encodes each element in the backed byte array recursively + * using the variable-length encoding from + * + * Google Protocol Buffers. It uses zigzag encoding to efficiently + * encode signed values. + * Increments buffer position. + * + * @param value the object value to encode + */ + public void putValue(Object value) { + if (value == null) { + putVarUInt32(Consts.VALUE_NULL); + } else if (value instanceof Boolean) { + putVarUInt32(Consts.VALUE_BOOL).putVarUInt32((Boolean) value ? 1L : 0L); + } else if (value instanceof Short) { + putVarUInt32(Consts.VALUE_INT).putVarInt64(((Short) value)); + } else if (value instanceof Integer) { + putVarUInt32(Consts.VALUE_INT).putVarInt64(((Integer) value)); + } else if (value instanceof Long) { + putVarUInt32(Consts.VALUE_INT_64).putVarInt64((Long) value); + } else if (value instanceof Float) { + putVarUInt32(Consts.VALUE_DOUBLE).putFloat(((Float) value)); + } else if (value instanceof Double) { + putVarUInt32(Consts.VALUE_DOUBLE).putDouble((Double) value); + } else if (value instanceof Character) { + putVarUInt32(Consts.VALUE_STRING).putVString(value.toString()); + } else if (value instanceof UUID) { + putVarUInt32(Consts.VALUE_UUID).putUuid((UUID) value); + } else if (value instanceof String) { + putVarUInt32(Consts.VALUE_STRING).putVString((String) value); + } else if (value instanceof Collection) { + putVarUInt32(Consts.VALUE_TUPLE); + Collection values = (Collection) value; + putVarUInt32(values.size()); + values.forEach(this::putValue); + } else if (value.getClass().isArray()) { + putVarUInt32(Consts.VALUE_TUPLE); + int length = Array.getLength(value); + putVarUInt32(length); + for (int i = 0; i < length; i++) { + putValue(Array.get(value, i)); + } + } else { + throw new IllegalArgumentException("Unsupported data type " + value.getClass()); + } + } + /** * Writes specified byte array into buffer. * Increments buffer position. diff --git a/src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalConnection.java b/src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalConnection.java index 966650f..e6ad5bd 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalConnection.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/cproto/PhysicalConnection.java @@ -288,22 +288,7 @@ private byte[] encodeArgs(Object[] args) { ByteBuffer buffer = new ByteBuffer(); buffer.putVarUInt32(args.length); for (Object arg : args) { - if (arg instanceof Boolean) { - buffer.putVarUInt32(Consts.VALUE_BOOL) - .putVarUInt32((Boolean) arg ? 1L : 0L); - } else if (arg instanceof Short) { - buffer.putVarUInt32(Consts.VALUE_INT) - .putVarInt64(((Short) arg)); - } else if (arg instanceof Integer) { - buffer.putVarUInt32(Consts.VALUE_INT) - .putVarInt64(((Integer) arg)); - } else if (arg instanceof Long) { - buffer.putVarUInt32(Consts.VALUE_INT_64) - .putVarInt64((Long) arg); - } else if (arg instanceof String) { - buffer.putVarUInt32(Consts.VALUE_STRING) - .putVString((String) arg); - } else if (arg instanceof byte[]) { + if (arg instanceof byte[]) { buffer.putVarUInt32(Consts.VALUE_STRING) .putVBytes(((byte[]) arg)); } else if (arg instanceof long[]) { @@ -316,7 +301,7 @@ private byte[] encodeArgs(Object[] args) { } buffer.putVBytes(arrayBuffer.bytes()); } else { - throw new IllegalArgumentException("Unsupported data type " + arg.getClass()); + buffer.putValue(arg); } } diff --git a/src/main/java/ru/rt/restream/reindexer/expression/Expression.java b/src/main/java/ru/rt/restream/reindexer/expression/Expression.java new file mode 100644 index 0000000..ae0b590 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/Expression.java @@ -0,0 +1,116 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.Query; +import ru.rt.restream.reindexer.TimeUnit; + +/** + * Represents a Reindexer expression, provides factory methods to creates different types of {@code Expression}. + * + * @see FieldExpression + * @see LiteralExpression + * @see StringExpression + * @see NowFunction + * @see FlatArrayLengthFunction + * @see SubQueryExpression + */ +public interface Expression { + + /** + * Creates a {@link FieldExpression} to use for field comparison. + * + * @param field the field name to use + * @return the {@link FieldExpression} to use + */ + static FieldExpression field(String field) { + return new FieldExpression(field); + } + + /** + * Creates a {@link LiteralExpression} to use for literal values comparison. + *

+ * Note: the expression expects raw values, the data type conversion is not supported. + * + * @param values the values to use + * @return the {@link LiteralExpression} to use + */ + static LiteralExpression values(Object... values) { + return new LiteralExpression(values); + } + + /** + * Creates a {@link StringExpression} to use for arithmetical expressions e.g., {@code now() - 1 * 24 * 60 * 60}. + *

+ * Note: the string-based expressions are currently only supported for update queries. + * + * @param expressionString the expression string to use + * @return the {@link StringExpression} to use + */ + static StringExpression string(String expressionString) { + return new StringExpression(expressionString); + } + + /** + * Creates a {@link NowFunction} to use for date comparison, defaults to {@link TimeUnit#SECONDS} time unit. + * + * @return the {@link NowFunction} to use + */ + static NowFunction now() { + return now(TimeUnit.SECONDS); + } + + /** + * Creates a {@link NowFunction} to use for date comparison with the specified {@code timeUnit}. + * + * @param unit the name of {@link TimeUnit} to use + * @return the {@link NowFunction} to use + */ + static NowFunction now(String unit) { + return now(TimeUnit.fromName(unit)); + } + + /** + * Creates a {@link NowFunction} to use for date comparison with the specified {@code timeUnit}. + * + * @param unit the {@link TimeUnit} to use + * @return the {@link NowFunction} to use + */ + static NowFunction now(TimeUnit unit) { + return new NowFunction(unit); + } + + /** + * Creates a {@link FlatArrayLengthFunction} to use for array comparison. + * + * @param fieldName the field name to use + * @return the {@link FlatArrayLengthFunction} to use + */ + static FlatArrayLengthFunction flatArrayLength(String fieldName) { + return new FlatArrayLengthFunction(fieldName); + } + + /** + * Creates a {@link SubQueryExpression} to use for comparison. + * + * @param subQuery the sub {@link Query} to use + * @return the {@link SubQueryExpression} to use + */ + static SubQueryExpression subQuery(Query subQuery) { + return new SubQueryExpression(subQuery); + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/FieldExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/FieldExpression.java new file mode 100644 index 0000000..7acf61f --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/FieldExpression.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +import java.util.Objects; + +/** + * Represents a field expression. + * + * @see Expression#field(String) + */ +public final class FieldExpression implements WhereExpression { + + private static final int EXPRESSION_TYPE = 0; + + private final String fieldName; + + /** + * Creates an instance. + * + * @param fieldName the field name to use + */ + FieldExpression(String fieldName) { + this.fieldName = Objects.requireNonNull(fieldName, "fieldName cannot be null"); + } + + @Override + public void serializeWhere(ByteBuffer buffer) { + buffer.putVarUInt32(EXPRESSION_TYPE); + buffer.putVString(fieldName); + } + + @Override + public String toString() { + return fieldName; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/FlatArrayLengthFunction.java b/src/main/java/ru/rt/restream/reindexer/expression/FlatArrayLengthFunction.java new file mode 100644 index 0000000..faa8c18 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/FlatArrayLengthFunction.java @@ -0,0 +1,53 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import java.util.Objects; + +/** + * Represents a {@code flat_array_len(field_name)} function expression. + */ +public final class FlatArrayLengthFunction extends FunctionExpression { + + private static final int FUNCTION_TYPE = 0; + + private final String fieldName; + + /** + * Creates an instance. + * + * @param fieldName the field name to use + */ + FlatArrayLengthFunction(String fieldName) { + this.fieldName = Objects.requireNonNull(fieldName, "fieldName cannot be null"); + } + + @Override + String[] getFields() { + return new String[]{fieldName}; + } + + @Override + int getFunctionType() { + return FUNCTION_TYPE; + } + + @Override + public String toString() { + return "flat_array_len(" + fieldName + ')'; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/FunctionExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/FunctionExpression.java new file mode 100644 index 0000000..602d10b --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/FunctionExpression.java @@ -0,0 +1,74 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import org.apache.commons.lang3.ArrayUtils; +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +/** + * A function expression, subclasses must provide {@link #getFunctionType()} + * and optionally {@link #getFields()} and/or {@link #getArguments()} if function + * supports fields and/or additional arguments. + * + * @see FlatArrayLengthFunction + * @see NowFunction + */ +abstract class FunctionExpression implements WhereExpression { + + private static final int EXPRESSION_TYPE = 2; + + @Override + public final void serializeWhere(ByteBuffer buffer) { + buffer.putVarUInt32(EXPRESSION_TYPE); + String[] fields = getFields(); + buffer.putVarUInt32(fields.length); + for (String field : fields) { + buffer.putVString(field); + } + Object[] arguments = getArguments(); + buffer.putVarUInt32(arguments.length); + for (Object argument : arguments) { + buffer.putValue(argument); + } + buffer.putVarUInt32(getFunctionType()); + } + + /** + * Returns field names applied to the function. + * + * @return the field names to use + */ + String[] getFields() { + return ArrayUtils.EMPTY_STRING_ARRAY; + } + + /** + * Returns arguments applied to the function. + * + * @return the arguments to use + */ + Object[] getArguments() { + return ArrayUtils.EMPTY_OBJECT_ARRAY; + } + + /** + * Returns the function type. + * + * @return the function type to use + */ + abstract int getFunctionType(); + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/LiteralExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/LiteralExpression.java new file mode 100644 index 0000000..1901f41 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/LiteralExpression.java @@ -0,0 +1,88 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +import java.lang.reflect.Array; +import java.util.Iterator; +import java.util.Objects; + +/** + * Represents a literal expression, e.g., {@code (1, 2, 3)}. + * + * @see Expression#values(Object...) + */ +public final class LiteralExpression implements WhereExpression { + + private static final int EXPRESSION_TYPE = 1; + + private final Object[] values; + + /** + * Creates an instance. + * + * @param values the literal values to use + */ + LiteralExpression(Object[] values) { + this.values = Objects.requireNonNull(values, "values cannot be null"); + } + + @Override + public void serializeWhere(ByteBuffer buffer) { + buffer.putVarUInt32(EXPRESSION_TYPE); + buffer.putVarUInt32(values.length); + for (Object value : values) { + buffer.putValue(value); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + appendValue(sb, values); + return sb.toString(); + } + + private void appendValue(StringBuilder sb, Object value) { + if (value instanceof String) { + sb.append("'").append(value).append("'"); + } else if (value instanceof Iterable) { + sb.append('('); + for (Iterator it = ((Iterable) value).iterator(); it.hasNext(); ) { + Object next = it.next(); + appendValue(sb, next); + if (it.hasNext()) { + sb.append(", "); + } + } + sb.append(')'); + } else if (value != null && value.getClass().isArray()) { + sb.append('('); + int length = Array.getLength(value); + for (int i = 0; i < length; i++) { + if (i > 0) { + sb.append(", "); + } + appendValue(sb, Array.get(value, i)); + } + sb.append(')'); + } else { + sb.append(value); + } + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/NowFunction.java b/src/main/java/ru/rt/restream/reindexer/expression/NowFunction.java new file mode 100644 index 0000000..7f420a4 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/NowFunction.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.TimeUnit; + +import java.util.Objects; + +/** + * Represents {@code now(timeUnit)} function expression. + * + * @see Expression#now() + * @see Expression#now(String) + * @see Expression#now(TimeUnit) + */ +public final class NowFunction extends FunctionExpression { + + private static final int FUNCTION_TYPE = 1; + + private final TimeUnit unit; + + /** + * Creates an instance. + * + * @param unit the {@link TimeUnit} to use + */ + NowFunction(TimeUnit unit) { + this.unit = Objects.requireNonNull(unit, "unit cannot be null"); + } + + @Override + Object[] getArguments() { + return new Object[]{unit.getName()}; + } + + @Override + int getFunctionType() { + return FUNCTION_TYPE; + } + + @Override + public String toString() { + return "now(" + unit.getName() + ')'; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/SetExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/SetExpression.java new file mode 100644 index 0000000..60a30ce --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/SetExpression.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +/** + * An expression that is used in {@code SET} clause. + */ +public interface SetExpression extends Expression { + + /** + * Serializes the expression for {@code SET} clause to the given {@code buffer}. + * + * @param buffer the {@link ByteBuffer} to use + */ + void serializeSet(ByteBuffer buffer); + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/StringExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/StringExpression.java new file mode 100644 index 0000000..8be0865 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/StringExpression.java @@ -0,0 +1,52 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.binding.Consts; +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +import java.util.Objects; + +/** + * Represents a string expression e.g., {@code now() - 1 * 24 * 60 * 60}. + */ +public final class StringExpression implements SetExpression { + + private final String expressionString; + + /** + * Creates an instance. + * + * @param expressionString the expression string to use + */ + StringExpression(String expressionString) { + this.expressionString = Objects.requireNonNull(expressionString, "expressionString cannot be null"); + } + + @Override + public void serializeSet(ByteBuffer buffer) { + buffer.putVarUInt32(1); // size. + buffer.putVarUInt32(1); // is expression. + buffer.putVarUInt32(Consts.VALUE_STRING); + buffer.putVString(expressionString); + } + + @Override + public String toString() { + return expressionString; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/SubQueryExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/SubQueryExpression.java new file mode 100644 index 0000000..41e5732 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/SubQueryExpression.java @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.Query; +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +import java.util.Objects; + +/** + * Represents a subquery expression. + * + * @see Expression#subQuery(Query) + */ +public final class SubQueryExpression implements WhereExpression { + + private static final int EXPRESSION_TYPE = 3; + + private final Query subQuery; + + /** + * Creates an instance. + * + * @param subQuery the {@link Query subquery} to use + */ + SubQueryExpression(Query subQuery) { + this.subQuery = Objects.requireNonNull(subQuery, "subQuery cannot be null"); + } + + @Override + public void serializeWhere(ByteBuffer buffer) { + buffer.putVarUInt32(EXPRESSION_TYPE); + buffer.putVBytes(subQuery.bytes()); + } + + @Override + public String toString() { + return '(' + subQuery.toString() + ')'; + } + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/WhereExpression.java b/src/main/java/ru/rt/restream/reindexer/expression/WhereExpression.java new file mode 100644 index 0000000..e9594b4 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/WhereExpression.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer.expression; + +import ru.rt.restream.reindexer.binding.cproto.ByteBuffer; + +/** + * An expression that is used in {@code WHERE} clause. + */ +public interface WhereExpression extends Expression { + + /** + * Serializes the expression for {@code WHERE} clause to the given {@code buffer}. + * + * @param buffer the {@link ByteBuffer} to use + */ + void serializeWhere(ByteBuffer buffer); + +} diff --git a/src/main/java/ru/rt/restream/reindexer/expression/package-info.java b/src/main/java/ru/rt/restream/reindexer/expression/package-info.java new file mode 100644 index 0000000..f2f8303 --- /dev/null +++ b/src/main/java/ru/rt/restream/reindexer/expression/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * This package defines classes used to support expressions. + */ +package ru.rt.restream.reindexer.expression; diff --git a/src/test/java/ru/rt/restream/reindexer/TimeUnitTest.java b/src/test/java/ru/rt/restream/reindexer/TimeUnitTest.java new file mode 100644 index 0000000..0ff6d53 --- /dev/null +++ b/src/test/java/ru/rt/restream/reindexer/TimeUnitTest.java @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Restream + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ru.rt.restream.reindexer; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TimeUnitTest { + + @ParameterizedTest + @CsvSource({ + "nsec,NANOS", + "usec,MICROS", + "msec,MILLIS", + "sec,SECONDS" + }) + void fromNameWhenMatchesThenReturnsTimeUnit(String name, TimeUnit timeUnit) { + assertThat(TimeUnit.fromName(name), is(timeUnit)); + } + + @Test + void fromNameWhenNameNullThenException() { + NullPointerException thrown = assertThrows( + NullPointerException.class, + () -> TimeUnit.fromName(null) + ); + assertThat(thrown.getMessage(), is("name cannot be null")); + } + + @Test + void fromNameWhenDoesNotMatchThenException() { + IllegalArgumentException thrown = assertThrows( + IllegalArgumentException.class, + () -> TimeUnit.fromName("unknown") + ); + assertThat(thrown.getMessage(), is("Unsupported TimeUnit name: unknown")); + } + +} diff --git a/src/test/java/ru/rt/restream/reindexer/connector/ReindexerTest.java b/src/test/java/ru/rt/restream/reindexer/connector/ReindexerTest.java index 6654144..1fb0c7d 100644 --- a/src/test/java/ru/rt/restream/reindexer/connector/ReindexerTest.java +++ b/src/test/java/ru/rt/restream/reindexer/connector/ReindexerTest.java @@ -22,11 +22,16 @@ import lombok.NoArgsConstructor; import lombok.Setter; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import ru.rt.restream.reindexer.EnumType; import ru.rt.restream.reindexer.Namespace; import ru.rt.restream.reindexer.NamespaceOptions; +import ru.rt.restream.reindexer.Query; import ru.rt.restream.reindexer.QueryResultJsonIterator; import ru.rt.restream.reindexer.ResultIterator; +import ru.rt.restream.reindexer.TimeUnit; import ru.rt.restream.reindexer.Transaction; import ru.rt.restream.reindexer.annotations.Convert; import ru.rt.restream.reindexer.annotations.Enumerated; @@ -35,10 +40,14 @@ import ru.rt.restream.reindexer.convert.FieldConverter; import ru.rt.restream.reindexer.convert.FieldConverterRegistryFactory; import ru.rt.restream.reindexer.db.DbBaseTest; +import ru.rt.restream.reindexer.expression.Expression; import ru.rt.restream.reindexer.util.JsonSerializer; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -76,6 +85,7 @@ import static ru.rt.restream.reindexer.Query.Condition.ALLSET; import static ru.rt.restream.reindexer.Query.Condition.EQ; import static ru.rt.restream.reindexer.Query.Condition.LE; +import static ru.rt.restream.reindexer.Query.Condition.LT; import static ru.rt.restream.reindexer.Query.Condition.RANGE; import static ru.rt.restream.reindexer.Query.Condition.SET; @@ -1096,6 +1106,27 @@ public void testUpdateItemListPrimitivesToEmptyArray() { assertThat(updatedItem.integers.isEmpty(), is(true)); } + @Test + public void testUpdateItemFieldToExpression() { + FieldConverterRegistryFactory.INSTANCE.registerFieldConverter(TestItemCreatedAt.class, "createdAt", new LocalDateTimeUnitFieldConverter(TimeUnit.SECONDS)); + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItemCreatedAt.class); + LocalDateTime now = LocalDateTime.now(); + TestItemCreatedAt testItem = new TestItemCreatedAt(); + testItem.id = 1; + testItem.createdAt = now; + db.upsert(namespaceName, testItem); + db.query(namespaceName, TestItemCreatedAt.class) + .where("id", EQ, 1) + .setExpression("createdAt", Expression.string("now() - 1 * 24 * 60 * 60")) + .update(); + TestItemCreatedAt found = db.query(namespaceName, TestItemCreatedAt.class) + .where("id", EQ, 1) + .getOne(); + assertThat(found.id, is(1)); + assertThat(found.createdAt.toLocalDate(), is(now.toLocalDate().minusDays(1))); + } + @Test public void testInsertItemWithDoubleValue() { String namespaceName = "items"; @@ -2925,6 +2956,161 @@ public void testTestItemDefaultStringConverters() { assertThat(foundByDefaultWriting.defaultWriting, is("default")); } + @Test + public void testQueryWhereNowExpressionDefaultsToSeconds() { + FieldConverterRegistryFactory.INSTANCE.registerFieldConverter(TestItemCreatedAt.class, "createdAt", new LocalDateTimeUnitFieldConverter(TimeUnit.SECONDS)); + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItemCreatedAt.class); + LocalDateTime createdAt = LocalDateTime.now().minusHours(1); + db.upsert(namespaceName, new TestItemCreatedAt(1, createdAt)); + ResultIterator iterator = db.query(namespaceName, TestItemCreatedAt.class) + .where(Expression.field("createdAt"), LT, Expression.now()) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItemCreatedAt found = iterator.next(); + assertThat(found.id, is(1)); + assertThat(found.createdAt.truncatedTo(ChronoUnit.SECONDS), is(createdAt.truncatedTo(ChronoUnit.SECONDS))); + } + + @Test + public void testQueryWhereNowExpressionUsesTimeUnitName() { + FieldConverterRegistryFactory.INSTANCE.registerFieldConverter(TestItemCreatedAt.class, "createdAt", new LocalDateTimeUnitFieldConverter(TimeUnit.MILLIS)); + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItemCreatedAt.class); + LocalDateTime createdAt = LocalDateTime.now().minusHours(1); + db.upsert(namespaceName, new TestItemCreatedAt(1, createdAt)); + ResultIterator iterator = db.query(namespaceName, TestItemCreatedAt.class) + .where(Expression.field("createdAt"), LT, Expression.now("msec")) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItemCreatedAt found = iterator.next(); + assertThat(found.id, is(1)); + assertThat(found.createdAt.truncatedTo(ChronoUnit.SECONDS), is(createdAt.truncatedTo(ChronoUnit.SECONDS))); + } + + @ParameterizedTest + @MethodSource("nowExpressionArguments") + public void testQueryWhereFieldToNowExpression(LocalDateTime createdAt, Query.Condition condition, TimeUnit timeUnit) { + FieldConverterRegistryFactory.INSTANCE.registerFieldConverter(TestItemCreatedAt.class, "createdAt", new LocalDateTimeUnitFieldConverter(timeUnit)); + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItemCreatedAt.class); + db.upsert(namespaceName, new TestItemCreatedAt(1, createdAt)); + ResultIterator iterator = db.query(namespaceName, TestItemCreatedAt.class) + .where(Expression.field("createdAt"), condition, Expression.now(timeUnit)) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItemCreatedAt found = iterator.next(); + assertThat(found.id, is(1)); + assertThat(found.createdAt.truncatedTo(ChronoUnit.SECONDS), is(createdAt.truncatedTo(ChronoUnit.SECONDS))); + } + + private static Stream nowExpressionArguments() { + LocalDateTime now = LocalDateTime.now(); + LocalDateTime past = now.minusHours(1); + LocalDateTime future = now.plusHours(1); + // Skip EQ tests (e.g., created_at = now()) to avoid flaky results due to I/O latency. + return Stream.of( + Arguments.of(past, Query.Condition.LT, TimeUnit.NANOS), + Arguments.of(past, Query.Condition.LT, TimeUnit.MICROS), + Arguments.of(past, Query.Condition.LT, TimeUnit.MILLIS), + Arguments.of(past, Query.Condition.LT, TimeUnit.SECONDS), + Arguments.of(past, Query.Condition.LE, TimeUnit.NANOS), + Arguments.of(past, Query.Condition.LE, TimeUnit.MICROS), + Arguments.of(past, Query.Condition.LE, TimeUnit.MILLIS), + Arguments.of(past, Query.Condition.LE, TimeUnit.SECONDS), + Arguments.of(future, Query.Condition.GT, TimeUnit.NANOS), + Arguments.of(future, Query.Condition.GT, TimeUnit.MICROS), + Arguments.of(future, Query.Condition.GT, TimeUnit.MILLIS), + Arguments.of(future, Query.Condition.GT, TimeUnit.SECONDS), + Arguments.of(future, Query.Condition.GE, TimeUnit.NANOS), + Arguments.of(future, Query.Condition.GE, TimeUnit.MICROS), + Arguments.of(future, Query.Condition.GE, TimeUnit.MILLIS), + Arguments.of(future, Query.Condition.GE, TimeUnit.SECONDS) + ); + } + + @ParameterizedTest + @MethodSource("literalExpressionArguments") + public void testQueryWhereFieldToLiteralExpression(Integer id, Query.Condition condition, Object[] values) { + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItem.class); + TestItem item = new TestItem(); + item.id = id; + db.upsert(namespaceName, item); + ResultIterator iterator = db.query(namespaceName, TestItem.class) + .where(Expression.field("id"), condition, Expression.values(values)) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItem found = iterator.next(); + assertThat(found.id, is(id)); + } + + private static Stream literalExpressionArguments() { + return Stream.of( + Arguments.of(1, Query.Condition.EQ, new Object[]{1}), + Arguments.of(1, Query.Condition.LT, new Object[]{2}), + Arguments.of(1, Query.Condition.LE, new Object[]{2}), + Arguments.of(1, Query.Condition.LE, new Object[]{1}), + Arguments.of(1, Query.Condition.GT, new Object[]{0}), + Arguments.of(1, Query.Condition.GE, new Object[]{0}), + Arguments.of(1, Query.Condition.GE, new Object[]{1}), + Arguments.of(1, Query.Condition.RANGE, new Object[]{0, 2}), + Arguments.of(1, Query.Condition.SET, new Object[]{0, 1, 2}) + ); + } + + @Test + public void testQueryWhereFieldToFieldExpression() { + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItem.class); + TestItem item = new TestItem(); + item.id = 1; + item.name = "Test"; + item.value = "Test"; + db.upsert(namespaceName, item); + ResultIterator iterator = db.query(namespaceName, TestItem.class) + .where(Expression.field("name"), EQ, Expression.field("value")) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItem found = iterator.next(); + assertThat(found.id, is(1)); + assertThat(found.name, is("Test")); + assertThat(found.value, is("Test")); + } + + @ParameterizedTest + @MethodSource("flatArrayLengthExpressionArguments") + public void testQueryWhereFlatArrayLengthToLiteralExpression(List integers, Query.Condition condition, Object[] values) { + String namespaceName = "items"; + db.openNamespace(namespaceName, NamespaceOptions.defaultOptions(), TestItem.class); + TestItem item = new TestItem(); + item.id = 1; + item.integers = integers; + db.upsert(namespaceName, item); + ResultIterator iterator = db.query(namespaceName, TestItem.class) + .where(Expression.flatArrayLength("integers"), condition, Expression.values(values)) + .execute(); + assertThat(iterator.hasNext(), is(true)); + TestItem found = iterator.next(); + assertThat(found.id, is(1)); + assertThat(found.integers, is(integers)); + } + + private static Stream flatArrayLengthExpressionArguments() { + List integers = Arrays.asList(1, 2, 3); + return Stream.of( + Arguments.of(integers, Query.Condition.EQ, new Object[]{3}), + Arguments.of(integers, Query.Condition.LT, new Object[]{4}), + Arguments.of(integers, Query.Condition.LE, new Object[]{4}), + Arguments.of(integers, Query.Condition.LE, new Object[]{3}), + Arguments.of(integers, Query.Condition.GT, new Object[]{2}), + Arguments.of(integers, Query.Condition.GE, new Object[]{2}), + Arguments.of(integers, Query.Condition.GE, new Object[]{3}), + Arguments.of(integers, Query.Condition.RANGE, new Object[]{1, 4}), + Arguments.of(integers, Query.Condition.SET, new Object[]{1, 2, 3, 4}) + ); + } + @Getter @Setter public static class SerialIdTestItem { @@ -3173,6 +3359,18 @@ public static class TestItemDefaultStringConverters { private String defaultWriting; } + @Getter + @Setter + @NoArgsConstructor + @AllArgsConstructor + public static class TestItemCreatedAt { + @Reindex(name = "id", isPrimaryKey = true) + private Integer id; + + @Reindex(name = "createdAt") + private LocalDateTime createdAt; + } + @Getter @Setter @NoArgsConstructor @@ -3194,6 +3392,64 @@ public String convertToDatabaseType(LocalDate localDate) { } } + public static class LocalDateTimeUnitFieldConverter implements FieldConverter { + + private final TimeUnit timeUnit; + + public LocalDateTimeUnitFieldConverter(TimeUnit timeUnit) { + this.timeUnit = timeUnit; + } + + @Override + public LocalDateTime convertToFieldType(Long ldt) { + return ldt != null ? fromEpoch(ldt) : null; + } + + @Override + public Long convertToDatabaseType(LocalDateTime ldt) { + return ldt != null ? toEpoch(ldt) : null; + } + + private LocalDateTime fromEpoch(long value) { + Instant instant = toInstant(value); + return LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); + } + + private Instant toInstant(long value) { + switch (timeUnit) { + case NANOS: + return Instant.ofEpochSecond(value / 1_000_000_000L, value % 1_000_000_000L); + case MICROS: + return Instant.ofEpochSecond(value / 1_000_000L, value % 1_000_000L); + case MILLIS: + return Instant.ofEpochMilli(value); + case SECONDS: + return Instant.ofEpochSecond(value); + default: + throw new IllegalArgumentException("Unsupported TimeUnit: " + timeUnit); + } + } + + private long toEpoch(LocalDateTime ldt) { + Instant instant = ldt.atZone(ZoneId.systemDefault()).toInstant(); + long seconds = instant.getEpochSecond(); + int nanos = instant.getNano(); + switch (timeUnit) { + case NANOS: + return seconds * 1_000_000_000L + nanos; + case MICROS: + return seconds * 1_000_000L + nanos / 1000; + case MILLIS: + return instant.toEpochMilli(); + case SECONDS: + return seconds; + default: + throw new IllegalArgumentException("Unsupported TimeUnit: " + timeUnit); + } + } + + } + public static class LocalDateTimeStringFieldConverter implements FieldConverter { @Override diff --git a/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java b/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java index 54652d3..93c9113 100644 --- a/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java +++ b/src/test/java/ru/rt/restream/reindexer/connector/SubQueryTest.java @@ -26,11 +26,13 @@ import ru.rt.restream.reindexer.ResultIterator; import ru.rt.restream.reindexer.annotations.Reindex; import ru.rt.restream.reindexer.db.DbBaseTest; +import ru.rt.restream.reindexer.expression.Expression; import java.util.ArrayList; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static ru.rt.restream.reindexer.Query.Condition.EQ; import static ru.rt.restream.reindexer.Query.Condition.GE; @@ -226,6 +228,99 @@ public void testWhereWithArgsQueryConditionValuesAndNoAgg() { assertThat(bannerNotExistsIterator.hasNext(), is(false)); } + @Test + public void testWhereFieldToSubQueryExpression() { + Namespace personsNs = db.openNamespace("persons", NamespaceOptions.defaultOptions(), Person.class); + for (int i = 0; i < 20; i++) { + int age = 9 + (i % 8) * 10; + String name = "Person" + i + "Age" + age; + personsNs.insert(new Person(i, name, age)); + } + // select * from persons p + // where p.age = (select max(age) from person) + Query maxAgeSubQuery = personsNs.query().aggregateMax("age"); + Query eldestPersonsQuery = personsNs.query() + .where(Expression.field("age"), EQ, Expression.subQuery(maxAgeSubQuery)); + ResultIterator iterator = eldestPersonsQuery.execute(); + List actualEldestPersons = new ArrayList<>(); + while (iterator.hasNext()) { + actualEldestPersons.add(iterator.next()); + } + assertThat(actualEldestPersons.size(), is(2)); + } + + @Test + public void testWhereSubQueryToLiteralExpression() { + Namespace bannersNs = db.openNamespace("banners", NamespaceOptions.defaultOptions(), Banner.class); + Namespace purchasesNs = db.openNamespace("purchases", NamespaceOptions.defaultOptions(), Purchase.class); + bannersNs.insert(new Banner(1, "Banner")); + int purchaseId = 0; + // 24 persons, everyone has from 0 to 3 purchases, for a total of 36 purchases. + for (int i = 0; i < 24; i++) { + for (int j = 0; j < i % 4; j++) { + int price = (j + 1) * 10; + purchasesNs.insert(new Purchase(purchaseId++, i, price, "Asset" + j)); + } + } + int personId = 14; + int sumPrices = 30; // 10 + 20 + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) = 30 + Query subQuery = purchasesNs.query() + .where("person_id", EQ, personId) + .aggregateSum("price"); + Query bannerExistsOnEqQuery = bannersNs.query() + .where("id", EQ, 1) + .where(Expression.subQuery(subQuery), EQ, Expression.values(sumPrices)); + ResultIterator bannerExistsOnEqIterator = bannerExistsOnEqQuery.execute(); + assertThat(bannerExistsOnEqIterator.hasNext(), is(true)); + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) >= 30 + Query bannerExistsQuery = bannersNs.query() + .where("id", EQ, 1) + .where(Expression.subQuery(subQuery), GE, Expression.values(sumPrices)); + ResultIterator bannerExistsIterator = bannerExistsQuery.execute(); + assertThat(bannerExistsIterator.hasNext(), is(true)); + // select * from banners b + // where b.id = 1 and (select sum(p.price) from purchases p where p.person_id = 14) < 30 + Query bannerNotExistsQuery = bannersNs.query() + .where("id", EQ, 1) + .where(Expression.subQuery(subQuery), LT, Expression.values(sumPrices)); + ResultIterator bannerNotExistsIterator = bannerNotExistsQuery.execute(); + assertThat(bannerNotExistsIterator.hasNext(), is(false)); + } + + @Test + public void testWhereFlatArrayLengthToSubQueryExpression() { + Namespace personsNs = db.openNamespace("persons", NamespaceOptions.defaultOptions(), Person.class); + Namespace purchasesNs = db.openNamespace("purchases", NamespaceOptions.defaultOptions(), Purchase.class); + Namespace personPurchasesNs = db.openNamespace("person_purchases", NamespaceOptions.defaultOptions(), PersonPurchases.class); + int personId = 0; + Person person = new Person(personId, "John Doe", 21); + personsNs.insert(person); + int purchaseSize = 5; + List purchaseIds = new ArrayList<>(purchaseSize); + for (int i = 0; i < purchaseSize; i++) { + Purchase purchase = new Purchase(i + 1, personId, (i + 1) * 10, "Asset" + i); + purchasesNs.insert(purchase); + purchaseIds.add(purchase.id); + } + personPurchasesNs.insert(new PersonPurchases(0, personId, purchaseIds)); + // select * from person_purchases pp + // where flat_array_len(pp.purchase_ids) = (select id from purchases where person_id = 0 order by 'id' desc limit 1) + Query latestPurchaseSubQuery = purchasesNs.query() + .select("id") + .where("person_id", EQ, personId) + .limit(1) + .sort("id", true); + Query personPurchasesQuery = personPurchasesNs.query() + .where(Expression.flatArrayLength("purchase_ids"), EQ, Expression.subQuery(latestPurchaseSubQuery)); + ResultIterator iterator = personPurchasesQuery.execute(); + assertThat(iterator.hasNext(), is(true)); + PersonPurchases personPurchases = iterator.next(); + assertThat(personPurchases.purchaseIds, hasSize(5)); + } + @Setter @Getter @NoArgsConstructor @@ -241,6 +336,22 @@ public static class Person { private int age; } + @Getter + @Setter + @NoArgsConstructor + @AllArgsConstructor + public static class PersonPurchases { + + @Reindex(name = "id", isPrimaryKey = true) + private int id; + + @Reindex(name = "person_id") + private int personId; + + @Reindex(name = "purchase_ids") + private List purchaseIds; + } + @Setter @Getter @NoArgsConstructor