diff --git a/ext/gvl_wrappers.h b/ext/gvl_wrappers.h index f048d7055..81eb646ea 100644 --- a/ext/gvl_wrappers.h +++ b/ext/gvl_wrappers.h @@ -15,12 +15,17 @@ #ifndef __gvl_wrappers_h #define __gvl_wrappers_h +#include #include #ifdef RUBY_EXTCONF_H # include RUBY_EXTCONF_H #endif +#if RUBY_API_VERSION_MAJOR < 4 +extern int ruby_thread_has_gvl_p(void); +#endif + #ifndef LIBPQ_HAS_CHUNK_MODE typedef struct pg_cancel_conn PGcancelConn; #endif @@ -83,20 +88,35 @@ typedef struct pg_cancel_conn PGcancelConn; } #ifdef ENABLE_GVL_UNLOCK -#define DEFINE_GVLCB_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \ - rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \ - struct gvl_wrapper_##name##_params params = { \ - {FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname}, when_non_void((rettype)0) \ - }; \ - rb_thread_call_with_gvl(gvl_##name##_skeleton, ¶ms); \ - when_non_void( return params.retval; ) \ - } + #if RUBY_API_VERSION_MAJOR >= 4 || defined(TRUFFLERUBY) + #define DEFINE_GVLCB_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \ + rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \ + struct gvl_wrapper_##name##_params params = { \ + {FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname}, when_non_void((rettype)0) \ + }; \ + rb_thread_call_with_gvl(gvl_##name##_skeleton, ¶ms); \ + when_non_void( return params.retval; ) \ + } + #else + #define DEFINE_GVLCB_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \ + rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \ + struct gvl_wrapper_##name##_params params = { \ + {FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname}, when_non_void((rettype)0) \ + }; \ + if (ruby_thread_has_gvl_p()) { \ + gvl_##name##_skeleton(¶ms); \ + } else { \ + rb_thread_call_with_gvl(gvl_##name##_skeleton, ¶ms); \ + } \ + when_non_void( return params.retval; ) \ + } + #endif #else -#define DEFINE_GVLCB_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \ - rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \ - when_non_void( return ) \ - name( FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname ); \ - } + #define DEFINE_GVLCB_STUB(name, when_non_void, rettype, lastparamtype, lastparamname) \ + rettype gvl_##name(FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST3) lastparamtype lastparamname){ \ + when_non_void( return ) \ + name( FOR_EACH_PARAM_OF_##name(DEFINE_PARAM_LIST1) lastparamname ); \ + } #endif #define GVL_TYPE_VOID(string) @@ -121,50 +141,8 @@ typedef struct pg_cancel_conn PGcancelConn; #define FOR_EACH_PARAM_OF_PQping(param) -#define FOR_EACH_PARAM_OF_PQexec(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQexecParams(param) \ - param(PGconn *, conn) \ - param(const char *, command) \ - param(int, nParams) \ - param(const Oid *, paramTypes) \ - param(const char * const *, paramValues) \ - param(const int *, paramLengths) \ - param(const int *, paramFormats) - -#define FOR_EACH_PARAM_OF_PQexecPrepared(param) \ - param(PGconn *, conn) \ - param(const char *, stmtName) \ - param(int, nParams) \ - param(const char * const *, paramValues) \ - param(const int *, paramLengths) \ - param(const int *, paramFormats) - -#define FOR_EACH_PARAM_OF_PQprepare(param) \ - param(PGconn *, conn) \ - param(const char *, stmtName) \ - param(const char *, query) \ - param(int, nParams) - -#define FOR_EACH_PARAM_OF_PQdescribePrepared(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQdescribePortal(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQclosePrepared(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQclosePortal(param) \ - param(PGconn *, conn) - #define FOR_EACH_PARAM_OF_PQgetResult(param) -#define FOR_EACH_PARAM_OF_PQputCopyData(param) \ - param(PGconn *, conn) \ - param(const char *, buffer) - #define FOR_EACH_PARAM_OF_PQputCopyEnd(param) \ param(PGconn *, conn) @@ -174,48 +152,8 @@ typedef struct pg_cancel_conn PGcancelConn; #define FOR_EACH_PARAM_OF_PQnotifies(param) -#define FOR_EACH_PARAM_OF_PQsendQuery(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQsendQueryParams(param) \ - param(PGconn *, conn) \ - param(const char *, command) \ - param(int, nParams) \ - param(const Oid *, paramTypes) \ - param(const char *const *, paramValues) \ - param(const int *, paramLengths) \ - param(const int *, paramFormats) - -#define FOR_EACH_PARAM_OF_PQsendPrepare(param) \ - param(PGconn *, conn) \ - param(const char *, stmtName) \ - param(const char *, query) \ - param(int, nParams) - -#define FOR_EACH_PARAM_OF_PQsendQueryPrepared(param) \ - param(PGconn *, conn) \ - param(const char *, stmtName) \ - param(int, nParams) \ - param(const char *const *, paramValues) \ - param(const int *, paramLengths) \ - param(const int *, paramFormats) - -#define FOR_EACH_PARAM_OF_PQsendDescribePrepared(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQsendDescribePortal(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQsendClosePrepared(param) \ - param(PGconn *, conn) - -#define FOR_EACH_PARAM_OF_PQsendClosePortal(param) \ - param(PGconn *, conn) - #define FOR_EACH_PARAM_OF_PQpipelineSync(param) -#define FOR_EACH_PARAM_OF_PQsendPipelineSync(param) - #define FOR_EACH_PARAM_OF_PQsetClientEncoding(param) \ param(PGconn *, conn) @@ -225,11 +163,6 @@ typedef struct pg_cancel_conn PGcancelConn; #define FOR_EACH_PARAM_OF_PQcancelStart(param) #define FOR_EACH_PARAM_OF_PQcancelPoll(param) -#define FOR_EACH_PARAM_OF_PQencryptPasswordConn(param) \ - param(PGconn *, conn) \ - param(const char *, passwd) \ - param(const char *, user) - #define FOR_EACH_PARAM_OF_PQcancel(param) \ param(PGcancel *, cancel) \ param(char *, errbuf) @@ -243,35 +176,16 @@ typedef struct pg_cancel_conn PGcancelConn; function(PQresetStart, GVL_TYPE_NONVOID, int, PGconn *, conn) \ function(PQresetPoll, GVL_TYPE_NONVOID, PostgresPollingStatusType, PGconn *, conn) \ function(PQping, GVL_TYPE_NONVOID, PGPing, const char *, conninfo) \ - function(PQexec, GVL_TYPE_NONVOID, PGresult *, const char *, command) \ - function(PQexecParams, GVL_TYPE_NONVOID, PGresult *, int, resultFormat) \ - function(PQexecPrepared, GVL_TYPE_NONVOID, PGresult *, int, resultFormat) \ - function(PQprepare, GVL_TYPE_NONVOID, PGresult *, const Oid *, paramTypes) \ - function(PQdescribePrepared, GVL_TYPE_NONVOID, PGresult *, const char *, stmtName) \ - function(PQdescribePortal, GVL_TYPE_NONVOID, PGresult *, const char *, portalName) \ - function(PQclosePrepared, GVL_TYPE_NONVOID, PGresult *, const char *, stmtName) \ - function(PQclosePortal, GVL_TYPE_NONVOID, PGresult *, const char *, portalName) \ function(PQgetResult, GVL_TYPE_NONVOID, PGresult *, PGconn *, conn) \ - function(PQputCopyData, GVL_TYPE_NONVOID, int, int, nbytes) \ function(PQputCopyEnd, GVL_TYPE_NONVOID, int, const char *, errormsg) \ function(PQgetCopyData, GVL_TYPE_NONVOID, int, int, async) \ function(PQnotifies, GVL_TYPE_NONVOID, PGnotify *, PGconn *, conn) \ - function(PQsendQuery, GVL_TYPE_NONVOID, int, const char *, query) \ - function(PQsendQueryParams, GVL_TYPE_NONVOID, int, int, resultFormat) \ - function(PQsendPrepare, GVL_TYPE_NONVOID, int, const Oid *, paramTypes) \ - function(PQsendQueryPrepared, GVL_TYPE_NONVOID, int, int, resultFormat) \ - function(PQsendDescribePrepared, GVL_TYPE_NONVOID, int, const char *, stmt) \ - function(PQsendDescribePortal, GVL_TYPE_NONVOID, int, const char *, portal) \ - function(PQsendClosePrepared, GVL_TYPE_NONVOID, int, const char *, stmt) \ - function(PQsendClosePortal, GVL_TYPE_NONVOID, int, const char *, portal) \ function(PQpipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \ - function(PQsendPipelineSync, GVL_TYPE_NONVOID, int, PGconn *, conn) \ function(PQsetClientEncoding, GVL_TYPE_NONVOID, int, const char *, encoding) \ function(PQisBusy, GVL_TYPE_NONVOID, int, PGconn *, conn) \ function(PQcancelBlocking, GVL_TYPE_NONVOID, int, PGcancelConn *, conn) \ function(PQcancelStart, GVL_TYPE_NONVOID, int, PGcancelConn *, conn) \ function(PQcancelPoll, GVL_TYPE_NONVOID, PostgresPollingStatusType, PGcancelConn *, conn) \ - function(PQencryptPasswordConn, GVL_TYPE_NONVOID, char *, const char *, algorithm) \ function(PQcancel, GVL_TYPE_NONVOID, int, int, errbufsize); FOR_EACH_BLOCKING_FUNCTION( DEFINE_GVL_STUB_DECL ); diff --git a/ext/pg.h b/ext/pg.h index 825eaf8eb..f230ee462 100644 --- a/ext/pg.h +++ b/ext/pg.h @@ -82,6 +82,8 @@ typedef long suseconds_t; #define pg_gc_location(x) x = rb_gc_location(x) +extern int ruby_native_thread_p(void); + /* For compatibility with ruby < 3.0 */ #ifndef RUBY_TYPED_FROZEN_SHAREABLE #define PG_RUBY_TYPED_FROZEN_SHAREABLE 0 diff --git a/ext/pg_connection.c b/ext/pg_connection.c index ed4ff8a9d..8e32fd548 100644 --- a/ext/pg_connection.c +++ b/ext/pg_connection.c @@ -434,7 +434,7 @@ pgconn_sync_encrypt_password(int argc, VALUE *argv, VALUE self) Check_Type(password, T_STRING); Check_Type(username, T_STRING); - encrypted = gvl_PQencryptPasswordConn(conn, StringValueCStr(password), StringValueCStr(username), RTEST(algorithm) ? StringValueCStr(algorithm) : NULL); + encrypted = PQencryptPasswordConn(conn, StringValueCStr(password), StringValueCStr(username), RTEST(algorithm) ? StringValueCStr(algorithm) : NULL); if ( encrypted ) { rval = rb_str_new2( encrypted ); PQfreemem( encrypted ); @@ -1128,11 +1128,11 @@ static VALUE pgconn_sync_exec_params( int, VALUE *, VALUE ); * This function has the same behavior as #async_exec, but is implemented using the synchronous command processing API of libpq. * It's not recommended to use explicit sync or async variants but #exec instead, unless you have a good reason to do so. * - * Both #sync_exec and #async_exec release the GVL while waiting for server response, so that concurrent threads will get executed. - * However #async_exec has two advantages: + * However #async_exec has some advantages: * - * 1. #async_exec can be aborted by signals (like Ctrl-C), while #exec blocks signal processing until the query is answered. - * 2. Ruby VM gets notified about IO blocked operations and can pass them through Fiber.scheduler. + * 1. Only #async_exec allows concurrent threads to run, while waiting for a server response. + * 2. #async_exec can be aborted by signals (like Ctrl-C), while #exec blocks signal processing until the query is answered. + * 3. Ruby VM gets notified about IO blocked operations and can pass them through Fiber.scheduler. * So only async_* methods are compatible to event based schedulers like the async gem. */ static VALUE @@ -1147,7 +1147,7 @@ pgconn_sync_exec(int argc, VALUE *argv, VALUE self) VALUE query_str = argv[0]; VALUE transcoded_str; - result = gvl_PQexec(this->pgconn, pg_cstr_enc(query_str, this->enc_idx, &transcoded_str)); + result = PQexec(this->pgconn, pg_cstr_enc(query_str, this->enc_idx, &transcoded_str)); RB_GC_GUARD(transcoded_str); rb_pgresult = pg_new_result(result, self); pg_result_check(rb_pgresult); @@ -1467,7 +1467,7 @@ pgconn_sync_exec_params( int argc, VALUE *argv, VALUE self ) resultFormat = NIL_P(in_res_fmt) ? 0 : NUM2INT(in_res_fmt); nParams = alloc_query_params( ¶msData ); - result = gvl_PQexecParams(this->pgconn, pg_cstr_enc(command, paramsData.enc_idx, &transcoded_str), nParams, paramsData.types, + result = PQexecParams(this->pgconn, pg_cstr_enc(command, paramsData.enc_idx, &transcoded_str), nParams, paramsData.types, (const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat); RB_GC_GUARD(transcoded_str); @@ -1523,7 +1523,7 @@ pgconn_sync_prepare(int argc, VALUE *argv, VALUE self) paramTypes[i] = NUM2UINT(param); } } - result = gvl_PQprepare(this->pgconn, name_cstr, command_cstr, nParams, paramTypes); + result = PQprepare(this->pgconn, name_cstr, command_cstr, nParams, paramTypes); RB_GC_GUARD(transcoded_str1); RB_GC_GUARD(transcoded_str2); @@ -1566,7 +1566,7 @@ pgconn_sync_exec_prepared(int argc, VALUE *argv, VALUE self) resultFormat = NIL_P(in_res_fmt) ? 0 : NUM2INT(in_res_fmt); nParams = alloc_query_params( ¶msData ); - result = gvl_PQexecPrepared(this->pgconn, pg_cstr_enc(name, paramsData.enc_idx, &transcoded_str), nParams, + result = PQexecPrepared(this->pgconn, pg_cstr_enc(name, paramsData.enc_idx, &transcoded_str), nParams, (const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat); @@ -1610,7 +1610,7 @@ pgconn_sync_describe_close_prepared_portal(VALUE self, VALUE name, PGresult *(*f static VALUE pgconn_sync_describe_prepared(VALUE self, VALUE stmt_name) { - return pgconn_sync_describe_close_prepared_portal(self, stmt_name, gvl_PQdescribePrepared); + return pgconn_sync_describe_close_prepared_portal(self, stmt_name, PQdescribePrepared); } @@ -1625,7 +1625,7 @@ pgconn_sync_describe_prepared(VALUE self, VALUE stmt_name) static VALUE pgconn_sync_describe_portal(VALUE self, VALUE stmt_name) { - return pgconn_sync_describe_close_prepared_portal(self, stmt_name, gvl_PQdescribePortal); + return pgconn_sync_describe_close_prepared_portal(self, stmt_name, PQdescribePortal); } @@ -1643,7 +1643,7 @@ pgconn_sync_describe_portal(VALUE self, VALUE stmt_name) static VALUE pgconn_sync_close_prepared(VALUE self, VALUE stmt_name) { - return pgconn_sync_describe_close_prepared_portal(self, stmt_name, gvl_PQclosePrepared); + return pgconn_sync_describe_close_prepared_portal(self, stmt_name, PQclosePrepared); } /* @@ -1659,7 +1659,7 @@ pgconn_sync_close_prepared(VALUE self, VALUE stmt_name) static VALUE pgconn_sync_close_portal(VALUE self, VALUE stmt_name) { - return pgconn_sync_describe_close_prepared_portal(self, stmt_name, gvl_PQclosePortal); + return pgconn_sync_describe_close_prepared_portal(self, stmt_name, PQclosePortal); } #endif @@ -1995,7 +1995,7 @@ pgconn_send_query(int argc, VALUE *argv, VALUE self) /* If called with no or nil parameters, use PQexec for compatibility */ if ( argc == 1 || (argc >= 2 && argc <= 4 && NIL_P(argv[1]) )) { - if(gvl_PQsendQuery(this->pgconn, pg_cstr_enc(argv[0], this->enc_idx, &transcoded_str)) == 0) + if(PQsendQuery(this->pgconn, pg_cstr_enc(argv[0], this->enc_idx, &transcoded_str)) == 0) pg_raise_conn_error( rb_eUnableToSend, self, "PQsendQuery %s", PQerrorMessage(this->pgconn)); RB_GC_GUARD(transcoded_str); @@ -2067,7 +2067,7 @@ pgconn_send_query_params(int argc, VALUE *argv, VALUE self) resultFormat = NIL_P(in_res_fmt) ? 0 : NUM2INT(in_res_fmt); nParams = alloc_query_params( ¶msData ); - result = gvl_PQsendQueryParams(this->pgconn, pg_cstr_enc(command, paramsData.enc_idx, &transcoded_str), nParams, paramsData.types, + result = PQsendQueryParams(this->pgconn, pg_cstr_enc(command, paramsData.enc_idx, &transcoded_str), nParams, paramsData.types, (const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat); RB_GC_GUARD(transcoded_str); @@ -2131,7 +2131,7 @@ pgconn_send_prepare(int argc, VALUE *argv, VALUE self) paramTypes[i] = NUM2UINT(param); } } - result = gvl_PQsendPrepare(this->pgconn, name_cstr, command_cstr, nParams, paramTypes); + result = PQsendPrepare(this->pgconn, name_cstr, command_cstr, nParams, paramTypes); RB_GC_GUARD(transcoded_str1); RB_GC_GUARD(transcoded_str2); @@ -2198,7 +2198,7 @@ pgconn_send_query_prepared(int argc, VALUE *argv, VALUE self) resultFormat = NIL_P(in_res_fmt) ? 0 : NUM2INT(in_res_fmt); nParams = alloc_query_params( ¶msData ); - result = gvl_PQsendQueryPrepared(this->pgconn, pg_cstr_enc(name, paramsData.enc_idx, &transcoded_str), nParams, + result = PQsendQueryPrepared(this->pgconn, pg_cstr_enc(name, paramsData.enc_idx, &transcoded_str), nParams, (const char * const *)paramsData.values, paramsData.lengths, paramsData.formats, resultFormat); @@ -2239,7 +2239,7 @@ static VALUE pgconn_send_describe_prepared(VALUE self, VALUE stmt_name) { return pgconn_send_describe_close_prepared_portal( - self, stmt_name, gvl_PQsendDescribePrepared, + self, stmt_name, PQsendDescribePrepared, "PQsendDescribePrepared"); } @@ -2255,7 +2255,7 @@ static VALUE pgconn_send_describe_portal(VALUE self, VALUE portal) { return pgconn_send_describe_close_prepared_portal( - self, portal, gvl_PQsendDescribePortal, + self, portal, PQsendDescribePortal, "PQsendDescribePortal"); } @@ -2273,7 +2273,7 @@ static VALUE pgconn_send_close_prepared(VALUE self, VALUE stmt_name) { return pgconn_send_describe_close_prepared_portal( - self, stmt_name, gvl_PQsendClosePrepared, + self, stmt_name, PQsendClosePrepared, "PQsendClosePrepared"); } @@ -2291,7 +2291,7 @@ static VALUE pgconn_send_close_portal(VALUE self, VALUE portal) { return pgconn_send_describe_close_prepared_portal( - self, portal, gvl_PQsendClosePortal, + self, portal, PQsendClosePortal, "PQsendClosePortal"); } #endif @@ -2787,7 +2787,7 @@ pgconn_sync_put_copy_data(int argc, VALUE *argv, VALUE self) Check_Type(buffer, T_STRING); - ret = gvl_PQputCopyData(this->pgconn, RSTRING_PTR(buffer), RSTRING_LENINT(buffer)); + ret = PQputCopyData(this->pgconn, RSTRING_PTR(buffer), RSTRING_LENINT(buffer)); if(ret == -1) pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(this->pgconn)); @@ -3452,12 +3452,12 @@ pgconn_discard_results(VALUE self) * and the PG::Result object will automatically be cleared when the block terminates. * In this instance, conn.exec returns the value of the block. * - * #exec is an alias for #async_exec which is almost identical to #sync_exec . + * #exec is an alias for #async_exec which is functional identical to #sync_exec . * #sync_exec is implemented on the simpler synchronous command processing API of libpq, whereas * #async_exec is implemented on the asynchronous API and on ruby's IO mechanisms. * Only #async_exec is compatible to Fiber.scheduler based asynchronous IO processing introduced in ruby-3.0. - * Both methods ensure that other threads can process while waiting for the server to - * complete the request, but #sync_exec blocks all signals to be processed until the query is finished. + * Only #async_exec ensures that other threads can process while waiting for the server to complete the request. + * In contrast #sync_exec blocks all threads and signals to be processed until the query is finished. * This is most notably visible by a delayed reaction to Control+C. * It's not recommended to use explicit sync or async variants but #exec instead, unless you have a good reason to do so. * @@ -3918,7 +3918,7 @@ static VALUE pgconn_send_pipeline_sync(VALUE self) { PGconn *conn = pg_get_pgconn(self); - int res = gvl_PQsendPipelineSync(conn); + int res = PQsendPipelineSync(conn); if( res != 1 ) pg_raise_conn_error( rb_ePGerror, self, "%s", PQerrorMessage(conn)); diff --git a/spec/pg/connection_spec.rb b/spec/pg/connection_spec.rb index 1546344f7..b5e88db60 100644 --- a/spec/pg/connection_spec.rb +++ b/spec/pg/connection_spec.rb @@ -764,11 +764,13 @@ conn.setnonblocking(true) res = nil - conn.exec <<-EOSQL + # use async_exec since sync_exec is no longer thread compatible, necessary for run_with_gate + conn.async_exec <<-EOSQL CREATE TEMP TABLE copytable (col1 TEXT); EOSQL - conn.exec( "COPY copytable FROM STDOUT CSV" ) + conn.async_exec( "COPY copytable FROM STDOUT CSV" ) + gate.stop data = "x" * 1000 * 1000