Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions .github/workflows/pr-build-workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,17 @@ jobs:
with:
java-version: 1.11
- name: Install Reindexer
# TODO: Temporarily build reindexer from sources until https://github.com/Restream/reindexer/issues/92 is released.
# run: |
# sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc
# echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list
# sudo apt-get update
# sudo apt-get install -y reindexer-dev libopenblas-pthread-dev
run: |
sudo curl https://repo.reindexer.io/RX-KEY.GPG -o /etc/apt/trusted.gpg.d/reindexer.asc
echo 'deb https://repo.reindexer.io/ubuntu-noble /' | sudo tee -a /etc/apt/sources.list
sudo apt-get update
sudo apt-get install -y reindexer-dev libopenblas-pthread-dev
curl -L https://github.com/Restream/reindexer/raw/master/dependencies.sh | sudo bash -s
git clone https://github.com/Restream/reindexer
cmake reindexer -Breindexer/build
cmake --build reindexer/build -- -j 4
sudo cmake --install reindexer/build
- name: Build with Maven
run: mvn --batch-mode --update-snapshots verify
66 changes: 66 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,72 @@ InnerJoins can be used as a condition in Where clause:
```
Note that usually Or operator implements short-circuiting for Where conditions: if the previous condition is true the next one is not evaluated. But in case of InnerJoin it works differently: in query1 (from the example above) both InnerJoin conditions are evaluated despite the result of WhereInt. Limit(0) as part of InnerJoin (query3 from the example above) does not join any data - it works like a filter only to verify conditions.

### Query Expressions

#### Functions
Reindexer provides built-in functions that can be used within `WHERE` clauses to enable advanced filtering capabilities beyond simple field comparisons.

##### flat_array_len(field_name)
The `flat_array_len` function returns the length or cardinality of a specified field, making it particularly useful for filtering based on array sizes or field presence.
The `flat_array_len` function can be used in both `SELECT` and `UPDATE` queries.

Behavior by Field Type:
- Array Fields: returns the number of elements in the array
- Scalar Fields (integers, strings, etc.): always returns 1
- Object Fields: always returns 1
- Nested Array Elements: returns the count of occurrences when the field is nested within arrays

Examples:

```java
// Find social media posts with between 10 and 50 comments
// and at least 3 attached media files.
List<Post> posts = db.query("posts", Post.class)
.where(Expression.flatArrayLength("comments"), RANGE, Expression.values(10, 50))
.where(Expression.flatArrayLength("media"), GE, Expression.values(3))
.toList();

// Update field 'size' with flat_array_len function.
db.query("posts", Post.class)
.where("id", EQ, 1)
.setExpression("size", Expression.string("flat_array_len(comments)"))
.update();
```

Notes:
- `flat_array_len` function operates efficiently on indexed fields
- Returns 0 if the specified field does not exist in a document
- Supports the following comparison operators: (`=`, `>`, `>=`, `<`, `<=`, `Range`, `Set`)
- Can be used in both `SELECT` and `UPDATE` queries

##### now(unit)
The `now()` function returns the current system timestamp, making it particularly useful for time-based filtering and data synchronization.
This function can be used in both `SELECT` and `UPDATE` queries.

Arguments:
- `sec` - returns timestamp in seconds (default if no argument is provided)
- `msec` - returns timestamp in milliseconds
- `usec` - returns timestamp in microseconds
- `nsec` - returns timestamp in nanoseconds

```java
// Find events that occurred in the past.
List<Event> events = db.query("events", Event.class)
.where(Expression.field("timestamp"), LE, Expression.now(TimeUnit.SECONDS))
.toList();

db.query("items", Item.class)
.where("id", EQ, 42)
.setExpression("updated_at", Expression.string("now(usec)"))
.update();
```

Notes:
- The returned timestamp represents seconds (or subunits) since the Unix epoch (January 1, 1970)
- Time resolution depends on the specified unit - use `nsec` for maximum precision
- All instances of `now()` within a single query share the same value, which is computed at the start of the query execution
- Useful for implementing TTL (Time-To-Live) functionality and audit logging

### Transactions and batch update

Reindexer supports transactions. Transaction are performs atomic namespace update. There are synchronous and
Expand Down
158 changes: 99 additions & 59 deletions src/main/java/ru/rt/restream/reindexer/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
import ru.rt.restream.reindexer.annotations.Hnsw;
import ru.rt.restream.reindexer.annotations.Ivf;
import ru.rt.restream.reindexer.annotations.VecBf;
import ru.rt.restream.reindexer.binding.Consts;
import ru.rt.restream.reindexer.binding.QueryResult;
import ru.rt.restream.reindexer.binding.RequestContext;
import ru.rt.restream.reindexer.binding.TransactionContext;
import ru.rt.restream.reindexer.binding.cproto.ByteBuffer;
import ru.rt.restream.reindexer.binding.cproto.cjson.PayloadType;
import ru.rt.restream.reindexer.expression.WhereExpression;
import ru.rt.restream.reindexer.expression.Expression;
import ru.rt.restream.reindexer.expression.SetExpression;
import ru.rt.restream.reindexer.util.JsonSerializer;
import ru.rt.restream.reindexer.util.Pair;
import ru.rt.restream.reindexer.vector.params.KnnSearchParam;
Expand All @@ -36,10 +38,10 @@
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.UUID;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

Expand All @@ -56,8 +58,6 @@
import static ru.rt.restream.reindexer.binding.Consts.MERGE;
import static ru.rt.restream.reindexer.binding.Consts.MODE_ACCURATE_TOTAL;
import static ru.rt.restream.reindexer.binding.Consts.OR_INNER_JOIN;
import static ru.rt.restream.reindexer.binding.Consts.VALUE_BOOL;
import static ru.rt.restream.reindexer.binding.Consts.VALUE_NULL;
import static ru.rt.restream.reindexer.binding.Consts.VALUE_STRING;

/**
Expand Down Expand Up @@ -112,6 +112,7 @@ public class Query<T> {
private static final int QUERY_FIELD_SUB_QUERY_CONDITION = 30;
private static final int QUERY_LOCAL = 31;
private static final int QUERY_KNN_CONDITION = 32;
private static final int QUERY_EXPRESSION_CONDITION = 36;

/**
* Condition types.
Expand Down Expand Up @@ -403,7 +404,7 @@ public Query<T> where(String indexName, Condition condition, Object... values) {

buffer.putVarUInt32(values.length);
for (Object key : values) {
putValue(key);
buffer.putValue(key);
}

return this;
Expand Down Expand Up @@ -444,7 +445,7 @@ public Query<T> where(Query<?> subquery, Condition condition, Object... values)

buffer.putVarUInt32(values.length);
for (Object key : values) {
putValue(key);
buffer.putValue(key);
}

return this;
Expand Down Expand Up @@ -472,6 +473,51 @@ public Query<T> where(String indexName, Condition condition, Query<?> subquery)
return this;
}

/**
* Where predicate between {@code left} and {@code right} expressions using {@code condition}.
* Supported combinations:
* <ul>
* <li>{@link Expression#field(String) field} condition {@link Expression#values(Object...) values}</li>
* <li>{@link Expression#field(String) field} condition {@link Expression#now(TimeUnit) now(unit)}</li>
* <li>{@link Expression#field(String) field} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}</li>
* <li>{@link Expression#field(String) field} condition {@link Expression#subQuery(Query) subquery}</li>
* <li>{@link Expression#field(String) field} condition {@link Expression#field(String) field}</li>
* <li>{@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#values(Object...) values}</li>
* <li>{@link Expression#flatArrayLength(String) flat_array_len(field)} condition {@link Expression#subQuery(Query) subquery}</li>
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#values(Object...) values}</li>
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#now(TimeUnit) now(unit)}</li>
* <li>{@link Expression#subQuery(Query) subquery} condition {@link Expression#flatArrayLength(String) flat_array_len(field)}</li>
* </ul>
* Example: {@code where(Expression.field("created_at"), Condition.LE, Expression.now())}.
* <p>
*
* @param left the left operand {@link Expression} to use
* @param condition the {@link Condition} to use
* @param right the right operand {@link Expression} to use
* @return the {@link Query} for further customizations
* @see Expression for factory methods to build different types of expressions
*/
public Query<T> where(WhereExpression left, Condition condition, WhereExpression right) {
Objects.requireNonNull(left, "left expression cannot be null");
Objects.requireNonNull(right, "right expression cannot be null");

logBuilder.where(nextOperation, left, condition.code, right);

buffer.putVarUInt32(QUERY_EXPRESSION_CONDITION);

left.serializeWhere(buffer);

buffer.putVarUInt32(nextOperation);
buffer.putVarUInt32(condition.code);

right.serializeWhere(buffer);

nextOperation = OP_AND;
queryCount++;

return this;
}

/**
* The condition are possible only on the vector indexed fields,
* marked with {@link Hnsw}, {@link Ivf}, {@link VecBf} annotations.
Expand Down Expand Up @@ -785,7 +831,7 @@ public Query<T> sort(String index, boolean desc, Object... values) {

buffer.putVarUInt32(values.length);
for (Object value : values) {
putValue(value);
buffer.putValue(value);
}

return this;
Expand All @@ -803,55 +849,6 @@ public Query<T> fetchCount(int fetchCount) {
return this;
}

private void putValue(Object value) {
if (value == null) {
buffer.putVarUInt32(VALUE_NULL);
} else if (value instanceof Boolean) {
buffer.putVarUInt32(VALUE_BOOL);
if ((Boolean) value) {
buffer.putVarUInt32(1);
} else {
buffer.putVarUInt32(0);
}
} else if (value instanceof Integer) {
buffer.putVarUInt32(Consts.VALUE_INT)
.putVarInt64((Integer) value);
} else if (value instanceof String) {
buffer.putVarUInt32(Consts.VALUE_STRING)
.putVString((String) value);
} else if (value instanceof Long) {
buffer.putVarUInt32(Consts.VALUE_INT_64)
.putVarInt64((Long) value);
} else if (value instanceof Byte) {
buffer.putVarUInt32(Consts.VALUE_INT)
.putVarInt64((Byte) value);
} else if (value instanceof Short) {
buffer.putVarUInt32(Consts.VALUE_INT)
.putVarInt64((Short) value);
} else if (value instanceof Double) {
buffer.putVarUInt32(Consts.VALUE_DOUBLE)
.putDouble((Double) value);
} else if (value instanceof Float) {
Float floatValue = (Float) value;
buffer.putVarUInt32(Consts.VALUE_DOUBLE)
.putDouble(floatValue.doubleValue());
} else if (value instanceof Character) {
Character character = (Character) value;
buffer.putVarUInt32(Consts.VALUE_STRING)
.putVString(character.toString());
} else if (value instanceof UUID) {
buffer.putVarUInt32(Consts.VALUE_UUID)
.putUuid((UUID) value);
} else if (value instanceof Object[]) {
buffer.putVarUInt32(Consts.VALUE_TUPLE);
Object[] objects = (Object[]) value;
buffer.putVarUInt32(objects.length);
for (Object object : objects) {
putValue(object);
}
}
}

/**
* Will execute query, and return stream of items.
* The returned stream must be closed using the {@link Stream#close()} method or
Expand Down Expand Up @@ -1122,7 +1119,7 @@ public Query<T> set(String fieldName, Object value) {
buffer.putVarUInt32(values.size());
for (Object v : values) {
buffer.putVarUInt32(0);
putValue(v);
buffer.putValue(v);
}
} else if (value != null && value.getClass().isArray()) {
Object[] values = (Object[]) value;
Expand All @@ -1137,19 +1134,43 @@ public Query<T> set(String fieldName, Object value) {
buffer.putVarUInt32(values.length);
for (Object v : values) {
buffer.putVarUInt32(0);
putValue(v);
buffer.putValue(v);
}
} else {
buffer.putVarUInt32(cmd);
buffer.putVString(fieldName);
buffer.putVarUInt32(1);
buffer.putVarUInt32(0);
putValue(value);
buffer.putValue(value);
}

return this;
}

/**
* Updates indexed field by {@link SetExpression}.
* <p>
* Example: {@code setExpression("created_at", Expression.string("now() - 1 * 24 * 60 * 60"))}.
* <p>
*
* @param fieldName the field name to use
* @param expression the expression to use
* @return the {@link Query} for further customizations
* @see Expression for factory methods to build different types of expressions
*/
public Query<T> setExpression(String fieldName, SetExpression expression) {
Objects.requireNonNull(expression, "expression cannot be null");

logBuilder.set(fieldName, expression);

buffer.putVarUInt32(QUERY_UPDATE_FIELD);
buffer.putVString(fieldName);

expression.serializeSet(buffer);

return this;
}

private void setObject(String fieldName, Object value) {
boolean isArray = false;
int count = 1;
Expand Down Expand Up @@ -1286,4 +1307,23 @@ String getSql() {
return logBuilder.getSql();
}

/**
* Returns all used bytes from the {@link ByteBuffer}.
*
* @return all used bytes from the {@code ByteBuffer}
*/
public byte[] bytes() {
return buffer.bytes();
}

/**
* Returns the string representation of the query.
*
* @return the string representation of the query
*/
@Override
public String toString() {
return getSql();
}

}
17 changes: 16 additions & 1 deletion src/main/java/ru/rt/restream/reindexer/QueryLogBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package ru.rt.restream.reindexer;

import org.apache.commons.lang3.StringUtils;
import ru.rt.restream.reindexer.expression.Expression;
import ru.rt.restream.reindexer.vector.params.KnnSearchParam;

import java.util.ArrayDeque;
Expand Down Expand Up @@ -59,7 +60,7 @@ private static class AggregateEntry {

private static class QueryEntry {
private Operation operation;
private String field;
private Object field;
private Condition condition;
private String secondField;
private int joinIndex = -1;
Expand Down Expand Up @@ -299,6 +300,20 @@ void where(int operationCode, String field, int conditionCode, Object... values)
}
}

void where(int operationCode, Expression left, int conditionCode, Expression right) {
QueryEntry queryEntry = new QueryEntry();
queryEntry.operation = getOperation(operationCode);
queryEntry.field = left;
queryEntry.condition = getCondition(conditionCode);
queryEntry.values.add(right);
if (!whereStack.isEmpty()) {
QueryEntry parent = whereStack.getLast();
parent.children.add(queryEntry);
} else {
whereEntries.add(queryEntry);
}
}

void whereKnn(int operationCode, String indexName, float[] vector, KnnSearchParam params) {
QueryEntry queryEntry = new QueryEntry();
queryEntry.operation = getOperation(operationCode);
Expand Down
Loading
Loading