From e6d4976840b3ca442f4902ad250040158b1320e4 Mon Sep 17 00:00:00 2001 From: madschemas <155993105+MadSchemas@users.noreply.github.com> Date: Wed, 29 Apr 2026 16:23:45 +0300 Subject: [PATCH] [fea] Add support for v5.13.0 reindexer API --- builtin-adapter/BuiltinAdapter.cpp | 8 ++++++-- builtin-adapter/BuiltinAdapter.h | 3 ++- src/main/java/ru/rt/restream/reindexer/Query.java | 9 ++++++--- .../java/ru/rt/restream/reindexer/binding/Binding.java | 3 ++- .../rt/restream/reindexer/binding/builtin/Builtin.java | 4 ++-- .../reindexer/binding/builtin/BuiltinAdapter.java | 5 +++-- .../reindexer/binding/builtin/server/BuiltinServer.java | 4 ++-- .../ru/rt/restream/reindexer/binding/cproto/Cproto.java | 5 +++-- 8 files changed, 26 insertions(+), 15 deletions(-) diff --git a/builtin-adapter/BuiltinAdapter.cpp b/builtin-adapter/BuiltinAdapter.cpp index b8a7561b..4bcd914d 100644 --- a/builtin-adapter/BuiltinAdapter.cpp +++ b/builtin-adapter/BuiltinAdapter.cpp @@ -282,9 +282,13 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA jlong rx, jlong ctxId, jlong timeout, - jbyteArray data) { + jbyteArray data, + jlongArray versions) { + auto ptVersions = reinterpret_cast(env->GetLongArrayElements(versions, nullptr)); + int ptVersionsCount = env->GetArrayLength(versions); reindexer_buffer bufferData = rx_buffer(env, data); - jobject res = j_res(env, reindexer_update_query(rx, bufferData, rx_ctx(ctxId, timeout))); + jobject res = j_res(env, reindexer_update_query(rx, bufferData, ptVersions, ptVersionsCount, rx_ctx(ctxId, timeout))); + env->ReleaseLongArrayElements(versions, reinterpret_cast(ptVersions), 0); env->ReleaseByteArrayElements(data, reinterpret_cast(bufferData.data), 0); return res; } diff --git a/builtin-adapter/BuiltinAdapter.h b/builtin-adapter/BuiltinAdapter.h index 0575c38e..b60ec97a 100644 --- a/builtin-adapter/BuiltinAdapter.h +++ b/builtin-adapter/BuiltinAdapter.h @@ -91,7 +91,8 @@ JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinA JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQuery(JNIEnv *, jobject, jlong, jlong, jlong, - jbyteArray); + jbyteArray, + jlongArray); JNIEXPORT jobject JNICALL Java_ru_rt_restream_reindexer_binding_builtin_BuiltinAdapter_updateQueryTx(JNIEnv *, jobject, jlong, jlong, diff --git a/src/main/java/ru/rt/restream/reindexer/Query.java b/src/main/java/ru/rt/restream/reindexer/Query.java index 73ffa1a0..a6533566 100644 --- a/src/main/java/ru/rt/restream/reindexer/Query.java +++ b/src/main/java/ru/rt/restream/reindexer/Query.java @@ -1074,8 +1074,7 @@ private long[] prepareQueryAndGetPayloadTypesVersions() { return namespaces.stream() .map(ReindexerNamespace::getPayloadType) - .map(pt -> pt == null ? 0 : pt.getStateToken()) - .mapToLong(Integer::longValue) + .mapToLong(pt -> pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken())) .toArray(); } @@ -1245,7 +1244,11 @@ public void update() { if (transactionContext != null) { transactionContext.updateQuery(buffer.bytes()); } else { - reindexer.getBinding().updateQuery(buffer.bytes()); + // There are no support for inner joins for update-queries in Java binding, + // so we are using single pt version + PayloadType pt = namespace.getPayloadType(); + long tmVersion = pt == null ? 0 : (pt.getVersion() ^ pt.getStateToken()); + reindexer.getBinding().updateQuery(buffer.bytes(), new long[]{tmVersion}); } } diff --git a/src/main/java/ru/rt/restream/reindexer/binding/Binding.java b/src/main/java/ru/rt/restream/reindexer/binding/Binding.java index d66b7691..e33f7a5e 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/Binding.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/Binding.java @@ -173,8 +173,9 @@ public interface Binding { * Invoke update query. * * @param queryData encoded query data (selected indexes, predicates, etc) + * @param ptVersions payload type state tokens */ - void updateQuery(byte[] queryData); + void updateQuery(byte[] queryData, long[] ptVersions); /** * Starts a transaction for the given namespace name. diff --git a/src/main/java/ru/rt/restream/reindexer/binding/builtin/Builtin.java b/src/main/java/ru/rt/restream/reindexer/binding/builtin/Builtin.java index 9cfdc2b4..21655239 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/builtin/Builtin.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/builtin/Builtin.java @@ -169,8 +169,8 @@ public void deleteQuery(byte[] queryData) { } @Override - public void updateQuery(byte[] queryData) { - ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData); + public void updateQuery(byte[] queryData, long[] ptVersions) { + ReindexerResponse response = adapter.updateQuery(rx, next.getAndIncrement(), timeout.toMillis(), queryData, ptVersions); checkResponse(response); } diff --git a/src/main/java/ru/rt/restream/reindexer/binding/builtin/BuiltinAdapter.java b/src/main/java/ru/rt/restream/reindexer/binding/builtin/BuiltinAdapter.java index 93b28391..6aa95340 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/builtin/BuiltinAdapter.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/builtin/BuiltinAdapter.java @@ -285,7 +285,7 @@ public native ReindexerResponse openNamespace(long rx, long ctxId, long timeout, * @param timeout the execution timeout * @param query the SQL-query string * @param asJson 'true' if response should be serialized in JSON format, defaults to CJSON - * @param versions the versions + * @param versions the tagsmatcher versions * @return the {@link ReindexerResponse} to use */ public native ReindexerResponse select(long rx, long ctxId, long timeout, String query, boolean asJson, @@ -319,9 +319,10 @@ public native ReindexerResponse select(long rx, long ctxId, long timeout, String * @param ctxId the context id * @param timeout the execution timeout * @param data the query payload (i.e. predicates, joins etc) + * @param versions the tagsmatcher versions * @return the {@link ReindexerResponse} to use */ - public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data); + public native ReindexerResponse updateQuery(long rx, long ctxId, long timeout, byte[] data, long[] versions); /** * Executes update query in the transaction. diff --git a/src/main/java/ru/rt/restream/reindexer/binding/builtin/server/BuiltinServer.java b/src/main/java/ru/rt/restream/reindexer/binding/builtin/server/BuiltinServer.java index d6882f66..bc1f8741 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/builtin/server/BuiltinServer.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/builtin/server/BuiltinServer.java @@ -172,8 +172,8 @@ public void deleteQuery(byte[] queryData) { } @Override - public void updateQuery(byte[] queryData) { - builtin.updateQuery(queryData); + public void updateQuery(byte[] queryData, long[] ptVersions) { + builtin.updateQuery(queryData, ptVersions); } @Override diff --git a/src/main/java/ru/rt/restream/reindexer/binding/cproto/Cproto.java b/src/main/java/ru/rt/restream/reindexer/binding/cproto/Cproto.java index db56291b..e369b696 100644 --- a/src/main/java/ru/rt/restream/reindexer/binding/cproto/Cproto.java +++ b/src/main/java/ru/rt/restream/reindexer/binding/cproto/Cproto.java @@ -165,8 +165,9 @@ public void deleteQuery(byte[] queryData) { } @Override - public void updateQuery(byte[] queryData) { - rpcCallNoResults(UPDATE_QUERY, queryData); + public void updateQuery(byte[] queryData, long[] ptVersions) { + int flags = Consts.RESULTS_PURE | Consts.RESULTS_WITH_PAYLOAD_TYPES; + rpcCallNoResults(UPDATE_QUERY, queryData, flags, ptVersions); } @Override