Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions builtin-adapter/BuiltinAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,13 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA
jlong rx,
jlong ctxId,
jlong timeout,
jbyteArray data) {
jbyteArray data,
jlongArray versions) {
auto ptVersions = reinterpret_cast<int32_t *>(env->GetLongArrayElements(versions, nullptr));
int ptVersionsCount = env->GetArrayLength(versions);
reindexer_buffer bufferData = rx_buffer(env, data);
jobject res = j_res(env, reindexer_update_query(rx, bufferData, rx_ctx(ctxId, timeout)));
jobject res = j_res(env, reindexer_update_query(rx, bufferData, ptVersions, ptVersionsCount, rx_ctx(ctxId, timeout)));
env->ReleaseLongArrayElements(versions, reinterpret_cast<jlong *>(ptVersions), 0);
env->ReleaseByteArrayElements(data, reinterpret_cast<jbyte *>(bufferData.data), 0);
return res;
}
Expand Down
3 changes: 2 additions & 1 deletion builtin-adapter/BuiltinAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA

JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQuery(JNIEnv *, jobject,
jlong, jlong, jlong,
jbyteArray);
jbyteArray,
jlongArray);

JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQueryTx(JNIEnv *, jobject,
jlong, jlong,
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/ru/rt/restream/reindexer/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -1074,8 +1074,7 @@ private long[] prepareQueryAndGetPayloadTypesVersions() {

return namespaces.stream()
.map(ReindexerNamespace::getPayloadType)
.map(pt -> pt == null ? 0 : pt.getStateToken())
.mapToLong(Integer::longValue)
.mapToLong(pt -> pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken()))
.toArray();
}

Expand Down Expand Up @@ -1245,7 +1244,11 @@ public void update() {
if (transactionContext != null) {
transactionContext.updateQuery(buffer.bytes());
} else {
reindexer.getBinding().updateQuery(buffer.bytes());
// There are no support for inner joins for update-queries in Java binding,
// so we are using single pt version
PayloadType pt = namespace.getPayloadType();
long tmVersion = pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken());
reindexer.getBinding().updateQuery(buffer.bytes(), new long[]{tmVersion});
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/ru/rt/restream/reindexer/binding/Binding.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,9 @@ public interface Binding {
* Invoke update query.
*
* @param queryData encoded query data (selected indexes, predicates, etc)
* @param ptVersions payload type state tokens
*/
void updateQuery(byte[] queryData);
void updateQuery(byte[] queryData, long[] ptVersions);

/**
* Starts a transaction for the given namespace name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ public void deleteQuery(byte[] queryData) {
}

@Override
public void updateQuery(byte[] queryData) {
ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData);
public void updateQuery(byte[] queryData, long[] ptVersions) {
ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData, ptVersions);
checkResponse(response);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ public native ReindexerResponse openNamespace(long rx, long ctxId, long timeout,
* @param timeout the execution timeout
* @param query the SQL-query string
* @param asJson 'true' if response should be serialized in JSON format, defaults to CJSON
* @param versions the versions
* @param versions the tagsmatcher versions
* @return the {@link ReindexerResponse} to use
*/
public native ReindexerResponse select(long rx, long ctxId, long timeout, String query, boolean asJson,
Expand Down Expand Up @@ -319,9 +319,10 @@ public native ReindexerResponse select(long rx, long ctxId, long timeout, String
* @param ctxId the context id
* @param timeout the execution timeout
* @param data the query payload (i.e. predicates, joins etc)
* @param versions the tagsmatcher versions
* @return the {@link ReindexerResponse} to use
*/
public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data);
public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data, long[] versions);

/**
* Executes update query in the transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public void deleteQuery(byte[] queryData) {
}

@Override
public void updateQuery(byte[] queryData) {
builtin.updateQuery(queryData);
public void updateQuery(byte[] queryData, long[] ptVersions) {
builtin.updateQuery(queryData, ptVersions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ public void deleteQuery(byte[] queryData) {
}

@Override
public void updateQuery(byte[] queryData) {
rpcCallNoResults(UPDATE_QUERY, queryData);
public void updateQuery(byte[] queryData, long[] ptVersions) {
int flags = Consts.RESULTS_PURE | Consts.RESULTS_WITH_PAYLOAD_TYPES;
rpcCallNoResults(UPDATE_QUERY, queryData, flags, ptVersions);
}

@Override
Expand Down
Loading