From 7ccd8c636b46da936e95c3aa8873a970b8bea2a2 Mon Sep 17 00:00:00 2001 From: John Gemignani Date: Thu, 30 Apr 2026 17:12:06 -0700 Subject: [PATCH] =?UTF-8?q?perf:=20hash-adjacency=20overhaul=20=E2=80=94?= =?UTF-8?q?=20agehash=20+=20flat-array=20VertexEdgeArray?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replaces the per-graph adjacency map with a Robin Hood open-addressing hashtable (agehash) and an embedded flat-array edge list, removing the hottest dynahash path on IC1 and shrinking the largest hashtable AGE keeps. Stages land as one commit: S1 MurmurHash3 fmix64 for graphid hashtables (replaces tag_hash) S2 Precompute graphid hash; share across paired DFS lookups S3 Replace ListGraphId adjacency with embedded flat-array VertexEdgeArray (single palloc, contiguous iteration) S4 Batched MLP lookup pipeline in add_valid_vertex_edges S5/C1 agehash library: INLINE Robin Hood hashtable with _with_hash API, freeze, iter, and a regress-only selftest S5/C2 Wire global graph edge_hashtable through agehash; drop edge_id from edge_entry (key lives in slot header); AGEHASH_MAX_LOAD=0.85; MemoryContextAllocHuge for SF10+ Performance (SF3 LDBC SNB, 5 runs/3 warmup, vs clean master baseline_v2): IC1 8,625 → 7,117 ms −17.49 % (the headline; hashtable-bound) IU1 40 → 35 ms −11.86 % (heaviest update; lookup-bound) IC sum 198,958 → 197,367 ms −0.80 % (suite-level noise) IS sum 1,009 → 1,028 ms +1.86 % (IS3 jitter; sub-ms) IU sum 77 → 72 ms −6.64 % IC2/3/4/5/6/7/8/9/10/11/12: parity (within ±3.3 %, mostly ±1.5 %) The VLE-DFS-heavy queries (IC3/5/6/9/11) sit at parity: with hash_search_with_hash_value at ≤1 % inclusive on their baseline flames, no hashtable swap can recover meaningful wall-time on them. Memory: removing edge_id from edge_entry saves ~416 MB on SF3 and ~1.4 GB on SF10 for the global graph's edge_hashtable. Slot capacity uses MemoryContextAllocHuge so SF10+ edge tables can be built. Adds: src/backend/utils/cache/agehash.c, src/include/utils/agehash.h regress/sql/agehash.sql + expected/agehash.out (boundary selftest) _agehash_self_test() in both fresh-install and upgrade SQL Tested on PostgreSQL 18.3 (REL_18_STABLE): all 35 regression tests pass (installcheck), warning-free build. Co-authored-by: Claude modified: Makefile modified: age--1.7.0--y.y.y.sql new file: regress/expected/agehash.out new file: regress/sql/agehash.sql modified: sql/age_main.sql modified: src/backend/utils/adt/age_global_graph.c modified: src/backend/utils/adt/age_vle.c new file: src/backend/utils/cache/agehash.c modified: src/include/utils/age_global_graph.h new file: src/include/utils/agehash.h --- Makefile | 2 + age--1.7.0--y.y.y.sql | 10 + regress/expected/agehash.out | 14 + regress/sql/agehash.sql | 9 + sql/age_main.sql | 10 + src/backend/utils/adt/age_global_graph.c | 283 +++++++--- src/backend/utils/adt/age_vle.c | 271 +++++---- src/backend/utils/cache/agehash.c | 687 +++++++++++++++++++++++ src/include/utils/age_global_graph.h | 49 +- src/include/utils/agehash.h | 210 +++++++ 10 files changed, 1372 insertions(+), 173 deletions(-) create mode 100644 regress/expected/agehash.out create mode 100644 regress/sql/agehash.sql create mode 100644 src/backend/utils/cache/agehash.c create mode 100644 src/include/utils/agehash.h diff --git a/Makefile b/Makefile index 3c5a28c29..52388276b 100644 --- a/Makefile +++ b/Makefile @@ -127,6 +127,7 @@ OBJS = src/backend/age.o \ src/backend/utils/ag_func.o \ src/backend/utils/graph_generation.o \ src/backend/utils/cache/ag_cache.o \ + src/backend/utils/cache/agehash.o \ src/backend/utils/load/ag_load_labels.o \ src/backend/utils/load/ag_load_edges.o \ src/backend/utils/load/age_load.o \ @@ -153,6 +154,7 @@ REGRESS = scan \ graphid \ agtype \ agtype_hash_cmp \ + agehash \ catalog \ cypher \ expr \ diff --git a/age--1.7.0--y.y.y.sql b/age--1.7.0--y.y.y.sql index a3cdea279..90a831d0f 100644 --- a/age--1.7.0--y.y.y.sql +++ b/age--1.7.0--y.y.y.sql @@ -459,3 +459,13 @@ BEGIN END LOOP; END; $$; + +-- Internal selftest for the agehash open-addressing hashtable. Returns "OK" +-- on success or "FAIL: ..." with a diagnostic message. Intended for the +-- agehash regression test only. +CREATE FUNCTION ag_catalog._agehash_self_test() + RETURNS text + LANGUAGE c + VOLATILE + PARALLEL UNSAFE +AS 'MODULE_PATHNAME'; diff --git a/regress/expected/agehash.out b/regress/expected/agehash.out new file mode 100644 index 000000000..a6e647978 --- /dev/null +++ b/regress/expected/agehash.out @@ -0,0 +1,14 @@ +/* + * agehash internal selftest. + * + * Exercises the Robin Hood open-addressing hashtable used by AGE's hot-path + * caches at boundary sizes (1, 7, 8, 9, 63, 64, 65, 1024, ...) and across + * grow thresholds. A success returns the literal "OK"; any failure returns + * "FAIL: " describing the offending case. + */ +SELECT ag_catalog._agehash_self_test(); + _agehash_self_test +-------------------- + OK +(1 row) + diff --git a/regress/sql/agehash.sql b/regress/sql/agehash.sql new file mode 100644 index 000000000..4e6f827e8 --- /dev/null +++ b/regress/sql/agehash.sql @@ -0,0 +1,9 @@ +/* + * agehash internal selftest. + * + * Exercises the Robin Hood open-addressing hashtable used by AGE's hot-path + * caches at boundary sizes (1, 7, 8, 9, 63, 64, 65, 1024, ...) and across + * grow thresholds. A success returns the literal "OK"; any failure returns + * "FAIL: " describing the offending case. + */ +SELECT ag_catalog._agehash_self_test(); diff --git a/sql/age_main.sql b/sql/age_main.sql index 3e9a71c92..dd8d8945e 100644 --- a/sql/age_main.sql +++ b/sql/age_main.sql @@ -86,6 +86,16 @@ CREATE FUNCTION ag_catalog._label_id(graph_name name, label_name name) PARALLEL SAFE AS 'MODULE_PATHNAME'; +-- Internal selftest for the agehash open-addressing hashtable. Returns "OK" +-- on success or "FAIL: ..." with a diagnostic message. Intended for the +-- agehash regression test only. +CREATE FUNCTION ag_catalog._agehash_self_test() + RETURNS text + LANGUAGE c + VOLATILE + PARALLEL UNSAFE +AS 'MODULE_PATHNAME'; + -- -- utility functions -- diff --git a/src/backend/utils/adt/age_global_graph.c b/src/backend/utils/adt/age_global_graph.c index a9b9b7111..8a7f2821e 100644 --- a/src/backend/utils/adt/age_global_graph.c +++ b/src/backend/utils/adt/age_global_graph.c @@ -40,6 +40,7 @@ #endif #include "utils/age_global_graph.h" +#include "utils/agehash.h" #include "catalog/ag_graph.h" #include "catalog/ag_label.h" #include "utils/ag_cache.h" @@ -48,7 +49,6 @@ /* defines */ #define VERTEX_HTAB_NAME "Vertex to edge lists " /* added a space at end for */ -#define EDGE_HTAB_NAME "Edge to vertex mapping " /* the graph name to follow */ #define VERTEX_HTAB_INITIAL_SIZE 10000 #define EDGE_HTAB_INITIAL_SIZE 10000 @@ -104,17 +104,26 @@ static GraphVersionState *shmem_version_state = NULL; typedef struct vertex_entry { graphid vertex_id; /* vertex id, it is also the hash key */ - ListGraphId *edges_in; /* List of entering edges graphids (int64) */ - ListGraphId *edges_out; /* List of exiting edges graphids (int64) */ - ListGraphId *edges_self; /* List of selfloop edges graphids (int64) */ + VertexEdgeArray edges_in; /* incoming edge graphids (flat array) */ + VertexEdgeArray edges_out; /* outgoing edge graphids (flat array) */ + VertexEdgeArray edges_self; /* self-loop edge graphids (flat array) */ Oid vertex_label_table_oid; /* the label table oid */ ItemPointerData tid; /* physical tuple location for lazy fetch */ } vertex_entry; -/* edge entry for the edge_hashtable */ +/* + * edge entry for the edge_table. + * + * The edge_id is the hash key and is stored in the agehash slot header + * (immediately before the payload). It is intentionally NOT a field on this + * payload struct: duplicating it would add 8 bytes per edge to the slot, + * which on SF10 (~175M edges) is over a gigabyte of overhead. Use + * get_edge_entry_id(ee) when you need the id of an entry returned by + * get_edge_entry / get_edge_entry_with_hash; that helper recovers the key + * from the slot via agehash_key_from_payload. + */ typedef struct edge_entry { - graphid edge_id; /* edge id, it is also the hash key */ Oid edge_label_table_oid; /* the label table oid */ ItemPointerData tid; /* physical tuple location for lazy fetch */ graphid start_vertex_id; /* start vertex */ @@ -131,7 +140,8 @@ typedef struct GRAPH_global_context char *graph_name; /* graph name */ Oid graph_oid; /* graph oid for searching */ HTAB *vertex_hashtable; /* hashtable to hold vertex edge lists */ - HTAB *edge_hashtable; /* hashtable to hold edge to vertex map */ + AgeHashTable *edge_table; /* edge to vertex map (Robin Hood) */ + MemoryContext edge_table_mcxt; /* private context owning edge_table */ uint64 graph_version; /* version counter for cache invalidation */ TransactionId xmin; /* snapshot fallback: transaction xmin */ TransactionId xmax; /* snapshot fallback: transaction xmax */ @@ -155,6 +165,50 @@ typedef struct GRAPH_global_context_container /* global variable to hold the per process GRAPH global contexts */ static GRAPH_global_context_container global_graph_contexts_container = {0}; +/* + * VertexEdgeArray helpers — flat-array adjacency container used by + * vertex_entry's edges_in / edges_out / edges_self. + * + * Growth policy: start at 4 slots on first append, then double on each + * overflow. This keeps the average cost of n appends amortised O(n) and + * keeps the memory waste bounded by 2x. + */ +#define VEA_INITIAL_CAPACITY 4 + +static inline void vea_append(VertexEdgeArray *vea, graphid edge_id) +{ + if (vea->size == vea->capacity) + { + int32 new_capacity = (vea->capacity == 0) + ? VEA_INITIAL_CAPACITY + : vea->capacity * 2; + + if (vea->array == NULL) + { + vea->array = (graphid *) palloc(new_capacity * sizeof(graphid)); + } + else + { + vea->array = (graphid *) repalloc(vea->array, + new_capacity * sizeof(graphid)); + } + + vea->capacity = new_capacity; + } + vea->array[vea->size++] = edge_id; +} + +static inline void vea_free(VertexEdgeArray *vea) +{ + if (vea->array != NULL) + { + pfree(vea->array); + vea->array = NULL; + } + vea->size = 0; + vea->capacity = 0; +} + /* declarations */ /* GRAPH global context functions */ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx); @@ -222,6 +276,57 @@ bool is_ggctx_invalid(GRAPH_global_context *ggctx) ggctx->curcid != snap->curcid); } } +/* + * Fast hash function for graphid (int64) keys. + * + * Replaces dynahash's tag_hash (Jenkins lookup3 → ~17 mixing ops) with the + * MurmurHash3 fmix64 finalizer (5 ops: 3 xorshifts + 2 multiplies). + * + * Quality: fmix64 is the avalanche stage of MurmurHash3 and passes all SMHasher + * tests for 64-bit integer inputs. The output is truncated to uint32 to match + * dynahash's HashValueFunc signature; bits 0..31 of fmix64 are well-mixed. + * + * Performance rationale: graphid lookups dominate hash_search_with_hash_value + * time (≈41% IC1 on SF3). Reducing the per-call mixing cost cuts both insert + * and lookup overhead in age_global_graph and age_vle hashtables. + */ +uint32 graphid_hash(const void *key, Size keysize) +{ + uint64 k; + + /* keysize is always sizeof(int64) at our four call sites; assert in debug. */ + Assert(keysize == sizeof(int64)); + (void) keysize; + + /* graphid keys are stored as int64; load aligned (callers pass &graphid). */ + memcpy(&k, key, sizeof(uint64)); + + /* MurmurHash3 fmix64 (Austin Appleby, public domain). */ + k ^= k >> 33; + k *= UINT64CONST(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= UINT64CONST(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + + return (uint32) k; +} + +/* + * agehash key-equality callback for graphid (int64) keys. + * + * graphid_hash collisions are rare but real (32-bit hash space, billions of + * possible keys), so the equality check has to compare the full 8 bytes. + * memcmp on a fixed 8-byte length compiles to a single load + cmp on x86, + * which is just as fast as an int64 cast and avoids any alignment risk on + * other architectures. + */ +bool graphid_keyeq(const void *a, const void *b, Size keysize) +{ + Assert(keysize == sizeof(int64)); + (void) keysize; + return memcmp(a, b, sizeof(int64)) == 0; +} + /* * Helper function to create the global vertex and edge hashtables. One * hashtable will hold the vertex, its edges (both incoming and exiting) as a @@ -231,49 +336,50 @@ bool is_ggctx_invalid(GRAPH_global_context *ggctx) static void create_GRAPH_global_hashtables(GRAPH_global_context *ggctx) { HASHCTL vertex_ctl; - HASHCTL edge_ctl; char *graph_name = NULL; char *vhn = NULL; - char *ehn = NULL; int glen; int vlen; - int elen; /* get the graph name and length */ graph_name = ggctx->graph_name; glen = strlen(graph_name); /* get the vertex htab name length */ vlen = strlen(VERTEX_HTAB_NAME); - /* get the edge htab name length */ - elen = strlen(EDGE_HTAB_NAME); - /* allocate the space and build the names */ + /* allocate the space and build the name */ vhn = palloc0(vlen + glen + 1); - ehn = palloc0(elen + glen + 1); - /* copy in the names */ strcpy(vhn, VERTEX_HTAB_NAME); - strcpy(ehn, EDGE_HTAB_NAME); - /* add in the graph name */ vhn = strncat(vhn, graph_name, glen); - ehn = strncat(ehn, graph_name, glen); /* initialize the vertex hashtable */ MemSet(&vertex_ctl, 0, sizeof(vertex_ctl)); vertex_ctl.keysize = sizeof(int64); vertex_ctl.entrysize = sizeof(vertex_entry); - vertex_ctl.hash = tag_hash; + vertex_ctl.hash = graphid_hash; ggctx->vertex_hashtable = hash_create(vhn, VERTEX_HTAB_INITIAL_SIZE, &vertex_ctl, HASH_ELEM | HASH_FUNCTION); pfree_if_not_null(vhn); - /* initialize the edge hashtable */ - MemSet(&edge_ctl, 0, sizeof(edge_ctl)); - edge_ctl.keysize = sizeof(int64); - edge_ctl.entrysize = sizeof(edge_entry); - edge_ctl.hash = tag_hash; - ggctx->edge_hashtable = hash_create(ehn, EDGE_HTAB_INITIAL_SIZE, &edge_ctl, - HASH_ELEM | HASH_FUNCTION); - pfree_if_not_null(ehn); + /* + * Initialize the edge_table (agehash, INLINE mode). + * + * Owns its own MemoryContext as a child of CurrentMemoryContext (which, + * at the call site, is TopMemoryContext for the lifetime of the cached + * GRAPH_global_context). Cleanup is a single MemoryContextDelete in + * free_specific_GRAPH_global_context, so an elog during build cannot + * leak slots. + */ + ggctx->edge_table_mcxt = + AllocSetContextCreate(CurrentMemoryContext, + "AGE edge_table", + ALLOCSET_DEFAULT_SIZES); + ggctx->edge_table = agehash_create_inline(ggctx->edge_table_mcxt, + sizeof(graphid), + sizeof(edge_entry), + EDGE_HTAB_INITIAL_SIZE, + graphid_hash, + graphid_keyeq); } /* helper function to get a List of all label names for the specified graph */ @@ -420,10 +526,10 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, bool found = false; /* search for the edge */ - ee = (edge_entry *)hash_search(ggctx->edge_hashtable, (void *)&edge_id, - HASH_ENTER, &found); + ee = (edge_entry *) agehash_insert(ggctx->edge_table, + (void *) &edge_id, &found); - /* if the hash enter returned is NULL, error out */ + /* agehash never returns NULL on insert; a NULL would indicate a bug. */ if (ee == NULL) { elog(ERROR, "insert_edge_entry: hash table returned NULL for ee"); @@ -445,20 +551,17 @@ static bool insert_edge_entry(GRAPH_global_context *ggctx, graphid edge_id, ereport(WARNING, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("previous edge: [id: %ld, start: %ld, end: %ld, label oid: %d]", - ee->edge_id, ee->start_vertex_id, ee->end_vertex_id, + edge_id, ee->start_vertex_id, ee->end_vertex_id, ee->edge_label_table_oid))); return false; } - /* not sure if we really need to zero out the entry, as we set everything */ - MemSet(ee, 0, sizeof(edge_entry)); - /* - * Set the edge id - this is important as this is the hash key value used - * for hash function collisions. + * agehash_insert zero-fills the payload on a fresh insert, so we can fill + * in only the fields we care about. The hash key (edge_id) lives in the + * slot header; recoverable via get_edge_entry_id() if needed. */ - ee->edge_id = edge_id; ee->tid = tid; ee->start_vertex_id = start_vertex_id; ee->end_vertex_id = end_vertex_id; @@ -520,10 +623,10 @@ static bool insert_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id, ve->vertex_label_table_oid = vertex_label_table_oid; /* set the TID for lazy property fetch */ ve->tid = tid; - /* set the NIL edge list */ - ve->edges_in = NULL; - ve->edges_out = NULL; - ve->edges_self = NULL; + /* + * MemSet above already zeroed the embedded VertexEdgeArray fields + * (array=NULL, size=0, capacity=0); no explicit NIL assignment needed. + */ /* we also need to store the vertex id for clean up of vertex lists */ ggctx->vertices = append_graphid(ggctx->vertices, vertex_id); @@ -561,7 +664,7 @@ static bool insert_vertex_edge(GRAPH_global_context *ggctx, */ if (start_found && is_selfloop) { - value->edges_self = append_graphid(value->edges_self, edge_id); + vea_append(&value->edges_self, edge_id); return true; } /* @@ -570,7 +673,7 @@ static bool insert_vertex_edge(GRAPH_global_context *ggctx, */ else if (start_found) { - value->edges_out = append_graphid(value->edges_out, edge_id); + vea_append(&value->edges_out, edge_id); } /* search for the end vertex of the edge */ @@ -584,7 +687,7 @@ static bool insert_vertex_edge(GRAPH_global_context *ggctx, */ if (start_found && end_found) { - value->edges_in = append_graphid(value->edges_in, edge_id); + vea_append(&value->edges_in, edge_id); return true; } /* @@ -836,7 +939,7 @@ static void load_edge_hashtable(GRAPH_global_context *ggctx) static void freeze_GRAPH_global_hashtables(GRAPH_global_context *ggctx) { hash_freeze(ggctx->vertex_hashtable); - hash_freeze(ggctx->edge_hashtable); + agehash_freeze(ggctx->edge_table); } /* @@ -885,14 +988,10 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) return false; } - /* free the edge list associated with this vertex */ - free_ListGraphId(value->edges_in); - free_ListGraphId(value->edges_out); - free_ListGraphId(value->edges_self); - - value->edges_in = NULL; - value->edges_out = NULL; - value->edges_self = NULL; + /* free the edge arrays associated with this vertex */ + vea_free(&value->edges_in); + vea_free(&value->edges_out); + vea_free(&value->edges_self); /* move to the next vertex */ curr_vertex = next_vertex; @@ -904,10 +1003,18 @@ static bool free_specific_GRAPH_global_context(GRAPH_global_context *ggctx) /* free the hashtables */ hash_destroy(ggctx->vertex_hashtable); - hash_destroy(ggctx->edge_hashtable); + /* + * The edge_table and all of its slots live entirely inside + * edge_table_mcxt, so a single MemoryContextDelete reclaims them. + */ + if (ggctx->edge_table_mcxt != NULL) + { + MemoryContextDelete(ggctx->edge_table_mcxt); + } ggctx->vertex_hashtable = NULL; - ggctx->edge_hashtable = NULL; + ggctx->edge_table = NULL; + ggctx->edge_table_mcxt = NULL; /* free the context */ pfree_if_not_null(ggctx); @@ -1201,17 +1308,33 @@ vertex_entry *get_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id) return ve; } -/* helper function to retrieve an edge_entry from the graph's edge hash table */ +/* helper function to retrieve an edge_entry from the graph's edge table */ edge_entry *get_edge_entry(GRAPH_global_context *ggctx, graphid edge_id) { - edge_entry *ee = NULL; - bool found = false; + edge_entry *ee; - /* retrieve the current edge entry */ - ee = (edge_entry *)hash_search(ggctx->edge_hashtable, (void *)&edge_id, - HASH_FIND, &found); + ee = (edge_entry *) agehash_lookup(ggctx->edge_table, (void *) &edge_id); /* it should be found, otherwise we have problems */ - Assert(found); + Assert(ee != NULL); + + return ee; +} + +/* + * Variant of get_edge_entry that uses a precomputed hash value to skip the + * agehash internal hash callback. The caller is responsible for ensuring + * hashvalue == graphid_hash(&edge_id, sizeof(int64)). Used by the VLE DFS + * hot loop where the same edge_id is also looked up in edge_state_hashtable. + */ +edge_entry *get_edge_entry_with_hash(GRAPH_global_context *ggctx, + graphid edge_id, uint32 hashvalue) +{ + edge_entry *ee; + + ee = (edge_entry *) agehash_lookup_with_hash(ggctx->edge_table, + (void *) &edge_id, + hashvalue); + Assert(ee != NULL); return ee; } @@ -1264,19 +1387,19 @@ graphid get_vertex_entry_id(vertex_entry *ve) return ve->vertex_id; } -ListGraphId *get_vertex_entry_edges_in(vertex_entry *ve) +VertexEdgeArray *get_vertex_entry_edges_in_array(vertex_entry *ve) { - return ve->edges_in; + return &ve->edges_in; } -ListGraphId *get_vertex_entry_edges_out(vertex_entry *ve) +VertexEdgeArray *get_vertex_entry_edges_out_array(vertex_entry *ve) { - return ve->edges_out; + return &ve->edges_out; } -ListGraphId *get_vertex_entry_edges_self(vertex_entry *ve) +VertexEdgeArray *get_vertex_entry_edges_self_array(vertex_entry *ve) { - return ve->edges_self; + return &ve->edges_self; } @@ -1343,7 +1466,15 @@ Datum get_vertex_entry_properties(vertex_entry *ve) /* edge_entry accessor functions */ graphid get_edge_entry_id(edge_entry *ee) { - return ee->edge_id; + /* + * The edge_id is stored as the agehash slot key, immediately preceding + * the payload pointer we hand back as `edge_entry *`. Recover it via + * the public agehash_key_from_payload helper to avoid a redundant + * 8-byte field on every entry (saves ~400MB on SF3, ~1.4GB on SF10). + */ + graphid k; + memcpy(&k, agehash_key_from_payload(ee, sizeof(graphid)), sizeof(graphid)); + return k; } Oid get_edge_entry_label_table_oid(edge_entry *ee) @@ -1450,7 +1581,7 @@ Datum age_vertex_stats(PG_FUNCTION_ARGS) { GRAPH_global_context *ggctx = NULL; vertex_entry *ve = NULL; - ListGraphId *edges = NULL; + VertexEdgeArray *edges = NULL; agtype_value *agtv_vertex = NULL; agtype_value *agtv_temp = NULL; agtype_value agtv_integer; @@ -1530,24 +1661,24 @@ Datum age_vertex_stats(PG_FUNCTION_ARGS) agtv_temp->val.int_value = 0; /* get and store the self_loops */ - edges = get_vertex_entry_edges_self(ve); - self_loops = (edges != NULL) ? get_list_size(edges) : 0; + edges = get_vertex_entry_edges_self_array(ve); + self_loops = edges->size; agtv_temp->val.int_value = self_loops; result.res = push_agtype_value(&result.parse_state, WAGT_KEY, string_to_agtype_value("self_loops")); result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, agtv_temp); /* get and store the in_degree */ - edges = get_vertex_entry_edges_in(ve); - degree = (edges != NULL) ? get_list_size(edges) : 0; + edges = get_vertex_entry_edges_in_array(ve); + degree = edges->size; agtv_temp->val.int_value = degree + self_loops; result.res = push_agtype_value(&result.parse_state, WAGT_KEY, string_to_agtype_value("in_degree")); result.res = push_agtype_value(&result.parse_state, WAGT_VALUE, agtv_temp); /* get and store the out_degree */ - edges = get_vertex_entry_edges_out(ve); - degree = (edges != NULL) ? get_list_size(edges) : 0; + edges = get_vertex_entry_edges_out_array(ve); + degree = edges->size; agtv_temp->val.int_value = degree + self_loops; result.res = push_agtype_value(&result.parse_state, WAGT_KEY, string_to_agtype_value("out_degree")); diff --git a/src/backend/utils/adt/age_vle.c b/src/backend/utils/adt/age_vle.c index 22c268cdf..1bde49139 100644 --- a/src/backend/utils/adt/age_vle.c +++ b/src/backend/utils/adt/age_vle.c @@ -163,8 +163,9 @@ static VLE_local_context *build_local_vle_context(FunctionCallInfo fcinfo, static void create_VLE_local_state_hashtable(VLE_local_context *vlelctx); static void free_VLE_local_context(VLE_local_context *vlelctx); /* VLE graph traversal functions */ -static edge_state_entry *get_edge_state(VLE_local_context *vlelctx, - graphid edge_id); +static edge_state_entry *get_edge_state_with_hash(VLE_local_context *vlelctx, + graphid edge_id, + uint32 hashvalue); /* graphid data structures */ static void load_initial_dfs_stacks(VLE_local_context *vlelctx); static bool dfs_find_a_path_between(VLE_local_context *vlelctx); @@ -358,7 +359,7 @@ static void create_VLE_local_state_hashtable(VLE_local_context *vlelctx) MemSet(&edge_state_ctl, 0, sizeof(edge_state_ctl)); edge_state_ctl.keysize = sizeof(int64); edge_state_ctl.entrysize = sizeof(edge_state_entry); - edge_state_ctl.hash = tag_hash; + edge_state_ctl.hash = graphid_hash; vlelctx->edge_state_hashtable = hash_create(eshn, EDGE_STATE_HTAB_INITIAL_SIZE, &edge_state_ctl, @@ -900,23 +901,24 @@ static VLE_local_context *build_local_vle_context(FunctionCallInfo fcinfo, } /* - * Helper function to get the specified edge's state. If it does not find it, it - * creates and initializes it. + * Helper function to get the specified edge's state, using a precomputed hash + * value. The dynahash table keyed on graphid is shared with edge_hashtable + * elsewhere, so callers can compute graphid_hash() once and reuse it for + * lookups in both tables. */ -static edge_state_entry *get_edge_state(VLE_local_context *vlelctx, - graphid edge_id) +static edge_state_entry *get_edge_state_with_hash(VLE_local_context *vlelctx, + graphid edge_id, + uint32 hashvalue) { edge_state_entry *ese = NULL; bool found = false; - /* retrieve the edge_state_entry from the edge state hashtable */ - ese = (edge_state_entry *)hash_search(vlelctx->edge_state_hashtable, - (void *)&edge_id, HASH_ENTER, &found); - - /* if it isn't found, it needs to be created and initialized */ + ese = (edge_state_entry *)hash_search_with_hash_value( + vlelctx->edge_state_hashtable, + (void *)&edge_id, hashvalue, + HASH_ENTER, &found); if (!found) { - /* the edge id is also the hash key for resolving collisions */ ese->edge_id = edge_id; ese->used_in_path = false; ese->has_been_matched = false; @@ -1015,11 +1017,18 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) edge_state_entry *ese = NULL; edge_entry *ee = NULL; bool found = false; + uint32 edge_hashvalue; /* get an edge, but leave it on the stack for now */ edge_id = gid_stack_peek(edge_stack); + /* + * Compute the hash for edge_id once and reuse it for both the + * edge_state_hashtable lookup and (later) the edge_hashtable lookup. + * Both tables key on graphid using graphid_hash(). + */ + edge_hashvalue = graphid_hash(&edge_id, sizeof(int64)); /* get the edge's state */ - ese = get_edge_state(vlelctx, edge_id); + ese = get_edge_state_with_hash(vlelctx, edge_id, edge_hashvalue); /* * If the edge is already in use, it means that the edge is in the path. * So, we need to see if it is the last path entry (we are backing up - @@ -1067,7 +1076,7 @@ static bool dfs_find_a_path_between(VLE_local_context *vlelctx) gid_stack_push(path_stack, edge_id); /* now get the edge entry so we can get the next vertex to move to */ - ee = get_edge_entry(vlelctx->ggctx, edge_id); + ee = get_edge_entry_with_hash(vlelctx->ggctx, edge_id, edge_hashvalue); next_vertex_id = get_next_vertex(vlelctx, ee); /* @@ -1143,11 +1152,18 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) edge_state_entry *ese = NULL; edge_entry *ee = NULL; bool found = false; + uint32 edge_hashvalue; /* get an edge, but leave it on the stack for now */ edge_id = gid_stack_peek(edge_stack); + /* + * Compute the hash for edge_id once and reuse it for both the + * edge_state_hashtable lookup and (later) the edge_hashtable lookup. + * Both tables key on graphid using graphid_hash(). + */ + edge_hashvalue = graphid_hash(&edge_id, sizeof(int64)); /* get the edge's state */ - ese = get_edge_state(vlelctx, edge_id); + ese = get_edge_state_with_hash(vlelctx, edge_id, edge_hashvalue); /* * If the edge is already in use, it means that the edge is in the path. * So, we need to see if it is the last path entry (we are backing up - @@ -1195,7 +1211,7 @@ static bool dfs_find_a_path_from(VLE_local_context *vlelctx) gid_stack_push(path_stack, edge_id); /* now get the edge entry so we can get the next vertex to move to */ - ee = get_edge_entry(vlelctx->ggctx, edge_id); + ee = get_edge_entry_with_hash(vlelctx->ggctx, edge_id, edge_hashvalue); next_vertex_id = get_next_vertex(vlelctx, ee); /* @@ -1259,16 +1275,50 @@ static bool is_edge_in_path(VLE_local_context *vlelctx, graphid edge_id) * * Note: The vertex must exist. */ +/* + * Batched candidate buffer size for the adjacency lookup pipeline below. + * 8 was chosen because it comfortably fits within the OoO window and the + * per-core L1 MSHR count of modern Xeons (12+), so the K back-to-back + * dynahash bucket misses overlap in a single MLP wave. + */ +#define VLE_LOOKUP_BATCH 8 + static void add_valid_vertex_edges(VLE_local_context *vlelctx, graphid vertex_id) { GraphIdStack *vertex_stack = NULL; GraphIdStack *edge_stack = NULL; - ListGraphId *edges = NULL; vertex_entry *ve = NULL; - GraphIdNode *edge_in = NULL; - GraphIdNode *edge_out = NULL; - GraphIdNode *edge_self = NULL; + /* + * Three flat-array adjacency lists, walked in parallel via integer + * indices. An empty (or direction-disabled) list has size == 0 so its + * branch never fires. This replaces the previous GraphIdNode pointer + * walk with a contiguous-memory traversal — significantly better for + * cache and branch-predictor behaviour on the DFS hot path. + */ + graphid *arr_out = NULL; + int32 sz_out = 0; + int32 idx_out = 0; + graphid *arr_in = NULL; + int32 sz_in = 0; + int32 idx_in = 0; + graphid *arr_self = NULL; + int32 sz_self = 0; + int32 idx_self = 0; + VertexEdgeArray *vea = NULL; + + /* + * Per-batch scratch arrays for the MLP lookup pipeline. Each iteration + * gathers up to VLE_LOOKUP_BATCH not-already-in-path candidate edges, + * then issues their edge_hashtable and edge_state_hashtable lookups in + * two tight back-to-back loops. The CPU's out-of-order engine overlaps + * the K independent dynahash bucket misses inside each loop, hiding + * memory latency that the original one-edge-at-a-time loop serialized. + */ + graphid batch_eids[VLE_LOOKUP_BATCH]; + uint32 batch_hashes[VLE_LOOKUP_BATCH]; + edge_entry *batch_ee[VLE_LOOKUP_BATCH]; + edge_state_entry *batch_ese[VLE_LOOKUP_BATCH]; /* get the vertex entry */ ve = get_vertex_entry(vlelctx->ggctx, vertex_id); @@ -1282,82 +1332,127 @@ static void add_valid_vertex_edges(VLE_local_context *vlelctx, vertex_stack = vlelctx->dfs_vertex_stack; edge_stack = vlelctx->dfs_edge_stack; - /* set to the first edge for each edge list for the specified direction */ + /* set up walked arrays for the requested direction(s) */ if (vlelctx->edge_direction == CYPHER_REL_DIR_RIGHT || vlelctx->edge_direction == CYPHER_REL_DIR_NONE) { - edges = get_vertex_entry_edges_out(ve); - edge_out = (edges != NULL) ? get_list_head(edges) : NULL; + vea = get_vertex_entry_edges_out_array(ve); + arr_out = vea->array; + sz_out = vea->size; } if (vlelctx->edge_direction == CYPHER_REL_DIR_LEFT || vlelctx->edge_direction == CYPHER_REL_DIR_NONE) { - edges = get_vertex_entry_edges_in(ve); - edge_in = (edges != NULL) ? get_list_head(edges) : NULL; + vea = get_vertex_entry_edges_in_array(ve); + arr_in = vea->array; + sz_in = vea->size; } - /* set to the first selfloop edge */ - edges = get_vertex_entry_edges_self(ve); - edge_self = (edges != NULL) ? get_list_head(edges) : NULL; + /* selfloops are always traversed */ + vea = get_vertex_entry_edges_self_array(ve); + arr_self = vea->array; + sz_self = vea->size; - /* add in valid vertex edges */ - while (edge_out != NULL || edge_in != NULL || edge_self != NULL) + /* + * Outer loop: drain the three flat arrays via a 5-phase pipeline. + * 1. Gather: pull up to VLE_LOOKUP_BATCH next edge_ids that survive + * the cheap is_edge_in_path() early-skip. + * 2. Hash: compute graphid_hash for the batch (pure compute). + * 3. Lookup: K back-to-back edge_hashtable HASH_FIND calls — MLP + * window 1 (the CPU overlaps the K bucket misses). + * 4. State: K back-to-back edge_state_hashtable HASH_ENTER calls — + * MLP window 2 (different table, different bucket misses). + * 5. Apply: per-edge match/state-update/stack-push, now operating + * on cache-warm ee/ese pointers. + * Phase 5 preserves the exact processing order of the original loop + * (out direction first, then in, then self), so DFS stack ordering and + * therefore path enumeration are identical to the previous version. + */ + while (idx_out < sz_out || idx_in < sz_in || idx_self < sz_self) { - edge_entry *ee = NULL; - edge_state_entry *ese = NULL; - graphid edge_id; + int batch_n = 0; + int i; - /* get the edge_id from the next available edge*/ - if (edge_out != NULL) - { - edge_id = get_graphid(edge_out); - } - else if (edge_in != NULL) - { - edge_id = get_graphid(edge_in); - } - else + /* Phase 1: gather */ + while (batch_n < VLE_LOOKUP_BATCH && + (idx_out < sz_out || idx_in < sz_in || idx_self < sz_self)) { - edge_id = get_graphid(edge_self); - } + graphid edge_id; - /* - * This is a fast existence check, relative to the hash search, for when - * the path stack is small. If the edge is in the path, we skip it. - */ - if (gid_stack_size(vlelctx->dfs_path_stack) < 10 && - is_edge_in_path(vlelctx, edge_id)) - { - /* set to the next available edge */ - if (edge_out != NULL) + if (idx_out < sz_out) { - edge_out = next_GraphIdNode(edge_out); + edge_id = arr_out[idx_out++]; } - else if (edge_in != NULL) + else if (idx_in < sz_in) { - edge_in = next_GraphIdNode(edge_in); + edge_id = arr_in[idx_in++]; } else { - edge_self = next_GraphIdNode(edge_self); + edge_id = arr_self[idx_self++]; } - continue; + + /* + * Fast early-skip when the path stack is small: avoids two + * dynahash lookups for edges already on the path. + */ + if (gid_stack_size(vlelctx->dfs_path_stack) < 10 && + is_edge_in_path(vlelctx, edge_id)) + { + continue; + } + + batch_eids[batch_n++] = edge_id; } - /* get the edge entry */ - ee = get_edge_entry(vlelctx->ggctx, edge_id); - /* it better exist */ - if (ee == NULL) + if (batch_n == 0) { - elog(ERROR, "add_valid_vertex_edges: no edge found"); + break; } - /* get its state */ - ese = get_edge_state(vlelctx, edge_id); - /* - * Don't add any edges that we have already seen because they will - * cause a loop to form. - */ - if (!ese->used_in_path) + + /* Phase 2: compute hashes (pure compute, no misses) */ + for (i = 0; i < batch_n; i++) { + batch_hashes[i] = graphid_hash(&batch_eids[i], sizeof(int64)); + } + + /* Phase 3: K back-to-back edge_hashtable lookups (MLP wave 1) */ + for (i = 0; i < batch_n; i++) + { + batch_ee[i] = get_edge_entry_with_hash(vlelctx->ggctx, + batch_eids[i], + batch_hashes[i]); + } + + /* Phase 4: K back-to-back edge_state_hashtable lookups (MLP wave 2) */ + for (i = 0; i < batch_n; i++) + { + batch_ese[i] = get_edge_state_with_hash(vlelctx, + batch_eids[i], + batch_hashes[i]); + } + + /* Phase 5: process the batch sequentially */ + for (i = 0; i < batch_n; i++) + { + edge_entry *ee = batch_ee[i]; + edge_state_entry *ese = batch_ese[i]; + graphid edge_id = batch_eids[i]; + + /* it better exist */ + if (ee == NULL) + { + elog(ERROR, "add_valid_vertex_edges: no edge found"); + } + + /* + * Don't add any edges that we have already seen because they + * will cause a loop to form. + */ + if (ese->used_in_path) + { + continue; + } + /* validate the edge if it hasn't been already */ if (!ese->has_been_matched && is_an_edge_match(vlelctx, ee)) { @@ -1369,37 +1464,25 @@ static void add_valid_vertex_edges(VLE_local_context *vlelctx, ese->has_been_matched = true; ese->matched = false; } + /* if it is a match, add it */ if (ese->has_been_matched && ese->matched) { /* - * We need to maintain our source vertex for each edge added - * if the edge_direction is CYPHER_REL_DIR_NONE. This is due - * to the edges having a fixed direction and the dfs + * We need to maintain our source vertex for each edge + * added if the edge_direction is CYPHER_REL_DIR_NONE. This + * is due to the edges having a fixed direction and the dfs * algorithm working strictly through edges. With an * un-directional VLE edge, you don't know the vertex that * you just came from. So, we need to store it. */ - if (vlelctx->edge_direction == CYPHER_REL_DIR_NONE) - { - gid_stack_push(vertex_stack, get_vertex_entry_id(ve)); - } - gid_stack_push(edge_stack, edge_id); + if (vlelctx->edge_direction == CYPHER_REL_DIR_NONE) + { + gid_stack_push(vertex_stack, get_vertex_entry_id(ve)); + } + gid_stack_push(edge_stack, edge_id); } } - /* get the next working edge */ - if (edge_out != NULL) - { - edge_out = next_GraphIdNode(edge_out); - } - else if (edge_in != NULL) - { - edge_in = next_GraphIdNode(edge_in); - } - else - { - edge_self = next_GraphIdNode(edge_self); - } } } @@ -2613,7 +2696,7 @@ Datum _ag_enforce_edge_uniqueness(PG_FUNCTION_ARGS) MemSet(&exists_ctl, 0, sizeof(exists_ctl)); exists_ctl.keysize = sizeof(int64); exists_ctl.entrysize = sizeof(int64); - exists_ctl.hash = tag_hash; + exists_ctl.hash = graphid_hash; /* create exists_hash table */ exists_hash = hash_create(EXISTS_HTAB_NAME, EXISTS_HTAB_NAME_INITIAL_SIZE, diff --git a/src/backend/utils/cache/agehash.c b/src/backend/utils/cache/agehash.c new file mode 100644 index 000000000..84915cb7b --- /dev/null +++ b/src/backend/utils/cache/agehash.c @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * agehash.c - Robin Hood open-addressing hashtable for AGE. + * + * See agehash.h for the public contract. This file implements the INLINE + * mode only. + * + * Internal slot layout (INLINE): + * + * bytes 0..1 uint16 probe_dist (AGEHASH_EMPTY = 0xFFFF marks empty) + * bytes 2..3 uint16 reserved (future tombstone / flag bits) + * bytes 4..7 uint32 pad (forces key to 8-byte alignment) + * bytes 8..K+7 key + * bytes K+8.. payload + * + * slot_size = MAXALIGN(8 + key_size + payload_size). + */ + +#include "postgres.h" + +#include "fmgr.h" +#include "utils/agehash.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + +/* ------------------------------------------------------------------------- */ + +struct AgeHashTable +{ + /* Slot array: capacity * slot_size bytes, palloc'd in mcxt. */ + char *slots; + uint32 capacity; /* always a power of two */ + uint32 capacity_mask; /* capacity - 1 */ + uint32 size; /* live entries */ + uint32 max_size; /* size at which we grow */ + uint32 slot_size; /* total bytes per slot */ + uint32 key_size; + uint32 payload_size; + uint32 payload_offset; /* AGEHASH_SLOT_KEY_OFFSET + key_size */ + AgeHashMode mode; + bool frozen; + agehash_hash_fn hash_fn; + agehash_keyeq_fn keyeq_fn; + MemoryContext mcxt; +}; + +/* ------------------------------------------------------------------------- */ +/* Slot accessors. */ + +static inline char * +slot_at(AgeHashTable *t, uint32 idx) +{ + return t->slots + (Size) idx * t->slot_size; +} + +static inline uint16 +slot_probe_dist(const char *slot) +{ + uint16 d; + memcpy(&d, slot, sizeof(uint16)); + return d; +} + +static inline void +slot_set_probe_dist(char *slot, uint16 d) +{ + memcpy(slot, &d, sizeof(uint16)); +} + +static inline char * +slot_key_ptr(AgeHashTable *t, char *slot) +{ + (void) t; + return slot + AGEHASH_SLOT_KEY_OFFSET; +} + +static inline char * +slot_payload_ptr(AgeHashTable *t, char *slot) +{ + return slot + t->payload_offset; +} + +/* ------------------------------------------------------------------------- */ +/* Construction. */ + +static uint32 +next_pow2(uint32 v) +{ + uint32 p = 1; + while (p < v) + p <<= 1; + return p; +} + +AgeHashTable * +agehash_create_inline(MemoryContext mcxt, + Size key_size, + Size payload_size, + uint32 capacity_hint, + agehash_hash_fn hash_fn, + agehash_keyeq_fn keyeq_fn) +{ + AgeHashTable *t; + MemoryContext oldctx; + uint32 min_cap; + uint32 cap; + + Assert(mcxt != NULL); + Assert(key_size > 0 && key_size <= 64); + Assert(payload_size > 0 && payload_size <= 4096); + Assert(hash_fn != NULL); + Assert(keyeq_fn != NULL); + + oldctx = MemoryContextSwitchTo(mcxt); + + t = palloc0(sizeof(AgeHashTable)); + t->mcxt = mcxt; + t->mode = AGEHASH_INLINE; + t->frozen = false; + t->hash_fn = hash_fn; + t->keyeq_fn = keyeq_fn; + t->key_size = (uint32) key_size; + t->payload_size = (uint32) payload_size; + t->payload_offset = AGEHASH_SLOT_KEY_OFFSET + (uint32) key_size; + t->slot_size = MAXALIGN(t->payload_offset + (uint32) payload_size); + + /* + * Capacity floor of 64 keeps tiny tables out of degenerate-load territory + * and avoids a flurry of grows on the first few inserts. + */ + if (capacity_hint == 0) + min_cap = 64; + else + { + /* size capacity_hint at MAX_LOAD so we don't immediately grow */ + min_cap = (uint32) ((double) capacity_hint / AGEHASH_MAX_LOAD) + 1; + if (min_cap < 64) + min_cap = 64; + } + cap = next_pow2(min_cap); + Assert((cap & (cap - 1)) == 0); + + t->capacity = cap; + t->capacity_mask = cap - 1; + t->size = 0; + t->max_size = (uint32) ((double) cap * AGEHASH_MAX_LOAD); + /* + * The slot array can comfortably exceed 1 GiB on production graphs + * (SF3 ldbc_snb edge_table is ~3 GiB at 0.7 load). Use the HUGE + * allocator to bypass the standard MaxAllocSize check. + */ + t->slots = (char *) MemoryContextAllocHuge(mcxt, + (Size) cap * t->slot_size); + + /* Mark every slot empty. */ + { + uint32 i; + for (i = 0; i < cap; i++) + slot_set_probe_dist(slot_at(t, i), AGEHASH_EMPTY); + } + + MemoryContextSwitchTo(oldctx); + return t; +} + +/* ------------------------------------------------------------------------- */ +/* Insert. Robin Hood with rich-poor swap. */ + +static void agehash_grow(AgeHashTable *t); + +static void * +agehash_insert_internal(AgeHashTable *t, const void *key, uint32 hashvalue, + bool *found) +{ + uint32 i; + uint16 d; + /* + * Carrier for the entry currently being placed. Starts as the caller's + * key with a fresh, zero-filled payload; gets overwritten on each + * Robin Hood swap. + */ + char carry_key[64]; + char carry_payload[4096]; + void *result_payload = NULL; + bool placed_caller = false; + + Assert(!t->frozen); + Assert(t->key_size <= sizeof(carry_key)); + Assert(t->payload_size <= sizeof(carry_payload)); + + /* Grow before insert if at threshold. */ + if (t->size >= t->max_size) + agehash_grow(t); + + /* Initialize carry buffers with the caller's key and an empty payload. */ + memcpy(carry_key, key, t->key_size); + memset(carry_payload, 0, t->payload_size); + + i = hashvalue & t->capacity_mask; + d = 0; + + for (;;) + { + char *slot = slot_at(t, i); + uint16 sd = slot_probe_dist(slot); + + if (sd == AGEHASH_EMPTY) + { + /* Place the carrier here and we're done. */ + slot_set_probe_dist(slot, d); + memcpy(slot_key_ptr(t, slot), carry_key, t->key_size); + memcpy(slot_payload_ptr(t, slot), carry_payload, t->payload_size); + t->size++; + if (!placed_caller) + { + /* The caller's slot landed here. */ + if (found != NULL) + *found = false; + return slot_payload_ptr(t, slot); + } + /* + * The caller was placed earlier via a swap; result_payload + * already points at their final slot. + */ + Assert(result_payload != NULL); + return result_payload; + } + + if (sd == d && + !placed_caller && + t->keyeq_fn(slot_key_ptr(t, slot), carry_key, t->key_size)) + { + /* + * Existing entry with the caller's key. Note: this match check + * is only relevant before we've performed a swap; once we've + * placed the caller into a slot, the key in `carry` is some + * displaced entry that, by RH invariant on insert from a fresh + * key, cannot already exist in the table. + */ + if (found != NULL) + *found = true; + return slot_payload_ptr(t, slot); + } + + if (sd < d) + { + /* + * Rich-poor swap: this slot's owner is closer to its ideal + * bucket than we are. Take its place and continue with the + * displaced entry. If we have not yet placed the caller, this + * is where they end up; remember the pointer so we can return + * it once the displaced chain finishes. + */ + char tmp_key[64]; + char tmp_payload[4096]; + uint16 tmp_d = sd; + + memcpy(tmp_key, slot_key_ptr(t, slot), t->key_size); + memcpy(tmp_payload, slot_payload_ptr(t, slot), t->payload_size); + + slot_set_probe_dist(slot, d); + memcpy(slot_key_ptr(t, slot), carry_key, t->key_size); + memcpy(slot_payload_ptr(t, slot), carry_payload, t->payload_size); + + if (!placed_caller) + { + placed_caller = true; + result_payload = slot_payload_ptr(t, slot); + /* Notify caller: this insert is a fresh entry. */ + if (found != NULL) + { + *found = false; + found = NULL; /* don't write again */ + } + } + + /* Continue with the displaced entry as the new carrier. */ + memcpy(carry_key, tmp_key, t->key_size); + memcpy(carry_payload, tmp_payload, t->payload_size); + d = tmp_d; + } + + i = (i + 1) & t->capacity_mask; + d++; + + /* + * Probe distance overflow guard. With AGEHASH_MAX_LOAD = 0.7 and a + * non-degenerate hash function, max probe is empirically <= 32. + * The 0xFE00 ceiling reserves headroom while leaving probe_dist + * well clear of the AGEHASH_EMPTY sentinel. + */ + Assert(d < 0xFE00); + if (unlikely(d >= 0xFE00)) + elog(ERROR, "agehash: probe distance overflow (likely a bad hash function)"); + } +} + +void * +agehash_insert(AgeHashTable *t, const void *key, bool *found) +{ + uint32 h = t->hash_fn(key, t->key_size); + return agehash_insert_internal(t, key, h, found); +} + +void * +agehash_insert_with_hash(AgeHashTable *t, const void *key, + uint32 hashvalue, bool *found) +{ + return agehash_insert_internal(t, key, hashvalue, found); +} + +/* ------------------------------------------------------------------------- */ +/* Grow: double the capacity and rehash. */ + +static void +agehash_grow(AgeHashTable *t) +{ + char *old_slots; + uint32 old_cap; + uint32 old_slot_size; + uint32 new_cap; + MemoryContext oldctx; + uint32 i; + + Assert(!t->frozen); + + old_slots = t->slots; + old_cap = t->capacity; + old_slot_size = t->slot_size; + + new_cap = old_cap << 1; + Assert(new_cap > old_cap); /* overflow guard */ + + oldctx = MemoryContextSwitchTo(t->mcxt); + + t->capacity = new_cap; + t->capacity_mask = new_cap - 1; + t->max_size = (uint32) ((double) new_cap * AGEHASH_MAX_LOAD); + /* HUGE allocator: see agehash_create_inline for the rationale. */ + t->slots = (char *) MemoryContextAllocHuge(t->mcxt, + (Size) new_cap * t->slot_size); + for (i = 0; i < new_cap; i++) + slot_set_probe_dist(slot_at(t, i), AGEHASH_EMPTY); + + /* Reset size; we re-insert below (which will increment it). */ + t->size = 0; + for (i = 0; i < old_cap; i++) + { + char *src = old_slots + (Size) i * old_slot_size; + if (slot_probe_dist(src) != AGEHASH_EMPTY) + { + void *src_key = src + AGEHASH_SLOT_KEY_OFFSET; + void *src_payload = src + AGEHASH_SLOT_KEY_OFFSET + t->key_size; + uint32 h = t->hash_fn(src_key, t->key_size); + void *dst_payload = agehash_insert_internal(t, src_key, h, NULL); + memcpy(dst_payload, src_payload, t->payload_size); + } + } + + pfree(old_slots); + MemoryContextSwitchTo(oldctx); +} + +/* ------------------------------------------------------------------------- */ +/* Lookup. */ + +void * +agehash_lookup_with_hash(AgeHashTable *t, const void *key, uint32 hashvalue) +{ + uint32 i = hashvalue & t->capacity_mask; + uint16 d = 0; + + for (;;) + { + char *slot = slot_at(t, i); + uint16 sd = slot_probe_dist(slot); + + if (sd == AGEHASH_EMPTY) + return NULL; + /* + * Robin Hood invariant: probe_dist values along a probe sequence + * are non-increasing as we move from an entry's home slot. If the + * slot we land on has a smaller probe_dist than ours, the key + * we're looking for can't be anywhere later in the sequence. + */ + if (sd < d) + return NULL; + if (t->keyeq_fn(slot_key_ptr(t, slot), key, t->key_size)) + return slot_payload_ptr(t, slot); + + i = (i + 1) & t->capacity_mask; + d++; + Assert(d < 0xFE00); + } +} + +void * +agehash_lookup(AgeHashTable *t, const void *key) +{ + uint32 h = t->hash_fn(key, t->key_size); + return agehash_lookup_with_hash(t, key, h); +} + +/* ------------------------------------------------------------------------- */ +/* Misc accessors. */ + +void +agehash_freeze(AgeHashTable *t) +{ + t->frozen = true; +} + +bool +agehash_is_frozen(const AgeHashTable *t) +{ + return t->frozen; +} + +uint32 +agehash_size(const AgeHashTable *t) +{ + return t->size; +} + +uint32 +agehash_capacity(const AgeHashTable *t) +{ + return t->capacity; +} + +void +agehash_iter_init(AgeHashTable *t, AgeHashIter *it) +{ + it->t = t; + it->idx = 0; + it->key = NULL; + it->payload = NULL; +} + +bool +agehash_iter_next(AgeHashIter *it) +{ + AgeHashTable *t = it->t; + while (it->idx < t->capacity) + { + char *slot = slot_at(t, it->idx); + uint32 idx = it->idx++; + (void) idx; + if (slot_probe_dist(slot) != AGEHASH_EMPTY) + { + it->key = slot_key_ptr(t, slot); + it->payload = slot_payload_ptr(t, slot); + return true; + } + } + it->key = NULL; + it->payload = NULL; + return false; +} + +/* ------------------------------------------------------------------------- */ +/* Self-test. Exercises insert / lookup / grow / iterate at small + medium + * sizes and verifies invariants. Returns a string in CurrentMemoryContext. */ + +/* MurmurHash3 fmix64, identical to graphid_hash. */ +static uint32 +selftest_hash(const void *key, Size keysize) +{ + uint64 k; + Assert(keysize == sizeof(uint64)); + memcpy(&k, key, sizeof(uint64)); + k ^= k >> 33; + k *= UINT64CONST(0xff51afd7ed558ccd); + k ^= k >> 33; + k *= UINT64CONST(0xc4ceb9fe1a85ec53); + k ^= k >> 33; + return (uint32) k; +} + +static bool +selftest_keyeq(const void *a, const void *b, Size keysize) +{ + return memcmp(a, b, keysize) == 0; +} + +typedef struct selftest_payload +{ + uint64 mirror_key; + uint64 marker; +} selftest_payload; + +static const char * +selftest_run_one(MemoryContext parent, uint32 n, uint32 hint) +{ + MemoryContext mcxt; + AgeHashTable *t; + selftest_payload *p; + bool found; + uint32 i; + uint32 seen; + AgeHashIter it; + + mcxt = AllocSetContextCreate(parent, "agehash selftest", ALLOCSET_DEFAULT_SIZES); + t = agehash_create_inline(mcxt, sizeof(uint64), sizeof(selftest_payload), + hint, selftest_hash, selftest_keyeq); + + /* Insert n keys. */ + for (i = 0; i < n; i++) + { + uint64 k = ((uint64) 0xa5a5 << 48) | (i + 1); + p = (selftest_payload *) agehash_insert(t, &k, &found); + if (found) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: duplicate insert at i=%u", i); + } + p->mirror_key = k; + p->marker = (uint64) 0xdeadbeef00000000ULL | i; + } + if (agehash_size(t) != n) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: size %u != %u after inserts", + agehash_size(t), n); + } + + /* Lookup all n keys. */ + for (i = 0; i < n; i++) + { + uint64 k = ((uint64) 0xa5a5 << 48) | (i + 1); + p = (selftest_payload *) agehash_lookup(t, &k); + if (p == NULL) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: lookup miss at i=%u", i); + } + if (p->mirror_key != k || + p->marker != ((uint64) 0xdeadbeef00000000ULL | i)) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: payload corruption at i=%u", i); + } + } + + /* Lookup n keys that should not exist. */ + for (i = 0; i < n; i++) + { + uint64 k = ((uint64) 0xb6b6 << 48) | (i + 1); + p = (selftest_payload *) agehash_lookup(t, &k); + if (p != NULL) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: phantom lookup hit at i=%u", i); + } + } + + /* Re-insert (HASH_ENTER semantics) — should report found = true. */ + for (i = 0; i < n; i++) + { + uint64 k = ((uint64) 0xa5a5 << 48) | (i + 1); + p = (selftest_payload *) agehash_insert(t, &k, &found); + if (!found) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: re-insert reported !found at i=%u", i); + } + if (p->mirror_key != k) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: re-insert payload mismatch at i=%u", i); + } + } + if (agehash_size(t) != n) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: size %u != %u after re-inserts", + agehash_size(t), n); + } + + /* Iterate and count. */ + seen = 0; + agehash_iter_init(t, &it); + while (agehash_iter_next(&it)) + { + selftest_payload *pp = it.payload; + uint64 k; + memcpy(&k, it.key, sizeof(uint64)); + if (pp->mirror_key != k) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: iter payload mismatch at seen=%u", seen); + } + seen++; + } + if (seen != n) + { + MemoryContextDelete(mcxt); + return psprintf("FAIL: iter saw %u of %u", seen, n); + } + + /* Freeze and confirm lookups still work. */ + agehash_freeze(t); + if (!agehash_is_frozen(t)) + { + MemoryContextDelete(mcxt); + return "FAIL: agehash_is_frozen returned false after freeze"; + } + { + uint64 k = ((uint64) 0xa5a5 << 48) | 1; + p = (selftest_payload *) agehash_lookup(t, &k); + if (p == NULL) + { + MemoryContextDelete(mcxt); + return "FAIL: lookup failed after freeze"; + } + } + + MemoryContextDelete(mcxt); + return NULL; /* OK */ +} + +const char * +agehash_self_test(void) +{ + static const struct { uint32 n; uint32 hint; } cases[] = { + { 1, 0 }, + { 7, 0 }, + { 8, 0 }, + { 9, 0 }, + { 63, 0 }, + { 64, 0 }, + { 65, 0 }, + { 1023, 0 }, /* forces grow from 64 floor */ + { 1024, 0 }, + { 1025, 0 }, + { 10000, 0 }, /* forces multiple grows */ + { 10000, 8192 }, /* with capacity hint, no grow expected */ + { 50000, 0 }, /* larger; multiple grows */ + { 1000000, 0 }, /* exercises grow at multi-MB allocations */ + /* + * NOTE: this set is bounded so 'make installcheck' completes + * quickly. The library has been manually verified up to 256M + * entries (multi-GiB slot arrays via MemoryContextAllocHuge). + */ + }; + const size_t ncases = sizeof(cases) / sizeof(cases[0]); + size_t i; + + for (i = 0; i < ncases; i++) + { + const char *r = selftest_run_one(CurrentMemoryContext, + cases[i].n, cases[i].hint); + if (r != NULL) + return psprintf("%s [n=%u hint=%u]", r, cases[i].n, cases[i].hint); + } + return "OK"; +} + +/* ------------------------------------------------------------------------- */ +/* SQL-callable wrapper: SELECT ag_catalog._agehash_self_test(); */ + +PG_FUNCTION_INFO_V1(_agehash_self_test); + +Datum +_agehash_self_test(PG_FUNCTION_ARGS) +{ + const char *r = agehash_self_test(); + PG_RETURN_TEXT_P(cstring_to_text(r)); +} diff --git a/src/include/utils/age_global_graph.h b/src/include/utils/age_global_graph.h index 92044fc7e..d68530a91 100644 --- a/src/include/utils/age_global_graph.h +++ b/src/include/utils/age_global_graph.h @@ -22,6 +22,25 @@ #include "utils/age_graphid_ds.h" +/* + * Flat dynamic-array adjacency container for vertex edges. Replaces a + * linked-list (ListGraphId) of GraphIdNodes for vertex_entry::edges_*. + * + * Storage: a single palloc'd graphid array, doubled on growth. The struct + * itself is embedded by value in vertex_entry so that the (array, size, + * capacity) triple lives in the same cache line as the surrounding entry + * fields, saving one indirection on the DFS hot path. + * + * Empty arrays carry array == NULL, size == 0, capacity == 0 and incur no + * allocation until the first append. + */ +typedef struct VertexEdgeArray +{ + graphid *array; /* contiguous edge graphid array; NULL when empty */ + int32 size; /* number of edges currently stored */ + int32 capacity; /* allocated capacity (in graphid slots) */ +} VertexEdgeArray; + /* * We declare the graph nodes and edges here, and in this way, so that it may be * used elsewhere. However, we keep the contents private by defining it in @@ -46,13 +65,27 @@ ListGraphId *get_graph_vertices(GRAPH_global_context *ggctx); vertex_entry *get_vertex_entry(GRAPH_global_context *ggctx, graphid vertex_id); edge_entry *get_edge_entry(GRAPH_global_context *ggctx, graphid edge_id); + +/* + * Variant of get_edge_entry that accepts a precomputed hash value, allowing + * the same hash to be reused across multiple lookups of the same graphid + * (e.g. edge_state_hashtable + edge_hashtable in the VLE DFS hot loop). + */ +edge_entry *get_edge_entry_with_hash(GRAPH_global_context *ggctx, + graphid edge_id, uint32 hashvalue); /* vertex entry accessor functions*/ graphid get_vertex_entry_id(vertex_entry *ve); -ListGraphId *get_vertex_entry_edges_in(vertex_entry *ve); -ListGraphId *get_vertex_entry_edges_out(vertex_entry *ve); -ListGraphId *get_vertex_entry_edges_self(vertex_entry *ve); Oid get_vertex_entry_label_table_oid(vertex_entry *ve); Datum get_vertex_entry_properties(vertex_entry *ve); + +/* + * Flat-array adjacency accessors. Returned pointer is into the entry's + * embedded VertexEdgeArray and is therefore non-NULL for a valid entry, + * but the underlying VertexEdgeArray::array may be NULL when size == 0. + */ +VertexEdgeArray *get_vertex_entry_edges_out_array(vertex_entry *ve); +VertexEdgeArray *get_vertex_entry_edges_in_array(vertex_entry *ve); +VertexEdgeArray *get_vertex_entry_edges_self_array(vertex_entry *ve); /* edge entry accessor functions */ graphid get_edge_entry_id(edge_entry *ee); Oid get_edge_entry_label_table_oid(edge_entry *ee); @@ -65,6 +98,16 @@ uint64 get_graph_version(Oid graph_oid); void increment_graph_version(Oid graph_oid); Oid get_graph_oid_for_table(Oid table_oid); +/* + * Fast hash function for graphid (int64) keys used in dynahash tables. + * Replaces tag_hash with the MurmurHash3 fmix64 finalizer for better + * distribution and lower instruction count on modern x86_64. + */ +uint32 graphid_hash(const void *key, Size keysize); + +/* Equality predicate for graphid (int64) keys; agehash_keyeq_fn signature. */ +bool graphid_keyeq(const void *a, const void *b, Size keysize); + /* Shared memory initialization for PG < 17 (shmem_request_hook path) */ #if PG_VERSION_NUM < 170000 void age_graph_version_shmem_request(void); diff --git a/src/include/utils/agehash.h b/src/include/utils/agehash.h new file mode 100644 index 000000000..7b22594dc --- /dev/null +++ b/src/include/utils/agehash.h @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * agehash.h - Robin Hood open-addressing hashtable for AGE's hot-path caches. + * + * This is an internal utility used by the global graph cache (vertex and edge + * tables) to replace dynahash on the lookup-dominated read-only-after-build + * path. Each lookup traverses a single contiguous slot array with no chain + * pointer-chasing, which on AGE workloads roughly halves lookup latency + * relative to dynahash (see Stage 5 microbench). + * + * Mode: only AGEHASH_INLINE is supported by this initial revision. INLINE + * stores the payload directly in the slot, suitable for tables that are + * never mutated after the build phase. A future revision will add an + * AGEHASH_INDIRECT mode for tables that support insert and pointer-stable + * payloads during queries. + * + * Memory: every allocation lives in a caller-supplied MemoryContext. Free is + * a single MemoryContextDelete by the caller; agehash itself never frees + * piecewise. This makes leak-on-elog impossible: PG unwinds the context. + * + * Capacity: always a power of two; grown by doubling when size exceeds + * AGEHASH_MAX_LOAD * capacity. After agehash_freeze() is called, all + * insert paths are forbidden (asserted in DEBUG builds), which guarantees + * that lookups can never observe a partially-rehashed structure. + */ + +#ifndef AG_AGEHASH_H +#define AG_AGEHASH_H + +#include "postgres.h" +#include "utils/memutils.h" +#include "utils/palloc.h" + +/* Sentinel probe distance marking an empty slot. */ +#define AGEHASH_EMPTY 0xFFFFu + +/* + * Load factor above which we grow. + * + * 0.85 balances three goals on AGE's hot tables: + * - Memory: the edge_table on SF3 is ~52M entries; at 0.85 the slot array + * is ~67M slots which is roughly the same total bytes as the dynahash + * bucket array + per-entry HASHELEMENT chain headers it replaces. + * - Probe distance: Robin Hood at 0.85 still keeps average probes near 1 + * and max probes well below the 0xFE00 overflow guard. + * - Grow cadence: a higher threshold means fewer doublings during the + * edge cache build (each doubling rehashes the entire table). + * + * If you change this, re-run the rh_microbench harness on the VM and the + * SF3 paired benchmark; both are sensitive to the load factor. + */ +#define AGEHASH_MAX_LOAD 0.85 + +/* + * Caller-supplied hash callback. keysize is constant for a given table; we + * still pass it so callers can reuse one function across multiple tables + * with different key types if desired. + */ +typedef uint32 (*agehash_hash_fn)(const void *key, Size keysize); + +/* Caller-supplied key-equality callback. Returns true iff a == b. */ +typedef bool (*agehash_keyeq_fn)(const void *a, const void *b, Size keysize); + +/* + * Layout mode. Only INLINE is implemented in this revision; INDIRECT is + * declared so the public enum values stay stable when it lands. + */ +typedef enum AgeHashMode +{ + AGEHASH_INLINE = 0, + AGEHASH_INDIRECT = 1 +} AgeHashMode; + +/* Opaque table handle. */ +typedef struct AgeHashTable AgeHashTable; + +/* + * Slot layout (INLINE mode), packed: + * + * offset 0 : uint16 probe_dist (AGEHASH_EMPTY == empty) + * offset 2 : uint16 _reserved (future flags / tombstones) + * offset 4 : uint32 _pad (force key to 8-byte alignment) + * offset 8 : key (key_size bytes) + * offset 8+key_size : payload (payload_size bytes) + * + * The header is 8 bytes; total slot bytes = 8 + key_size + payload_size, + * rounded up to MAXIMUM_ALIGNOF. + */ + +#define AGEHASH_SLOT_HDR_BYTES 8 +#define AGEHASH_SLOT_KEY_OFFSET AGEHASH_SLOT_HDR_BYTES + +/* + * Recover a key pointer from a payload pointer. INLINE-mode tables store + * the key immediately before the payload, so this is pure pointer + * arithmetic and does not need the table handle. The caller must know the + * key size at this site; this is the case for every AGE caller (each table + * has a single fixed key type). + */ +#define agehash_key_from_payload(payload, key_size) \ + ((const void *) ((const char *) (payload) - (Size) (key_size))) + +/* + * Construction. capacity_hint is a number of entries; the actual capacity + * will be the next power of two >= capacity_hint / AGEHASH_MAX_LOAD, with a + * floor of 64 slots. Pass 0 to let the table start at the floor. + */ +extern AgeHashTable *agehash_create_inline(MemoryContext mcxt, + Size key_size, + Size payload_size, + uint32 capacity_hint, + agehash_hash_fn hash_fn, + agehash_keyeq_fn keyeq_fn); + +/* + * Reserve / find. If the key is not present, allocates a fresh slot + * (rebalancing via Robin Hood swaps), zero-fills the payload, sets + * *found = false, and returns a pointer to the payload region. The caller + * fills it in. If the key is present, sets *found = true and returns the + * existing payload pointer. + * + * The returned payload pointer is *not* stable across subsequent + * agehash_insert calls in INLINE mode (a later insert may swap this slot). + * Callers requiring stable pointers must use INDIRECT mode (future). + * + * Asserts that the table has not been frozen (DEBUG builds). + */ +extern void *agehash_insert(AgeHashTable *t, const void *key, bool *found); + +/* Variant that accepts a precomputed hash value, skipping the hash callback. */ +extern void *agehash_insert_with_hash(AgeHashTable *t, const void *key, + uint32 hashvalue, bool *found); + +/* + * Lookup. Returns a pointer to the payload region, or NULL if absent. + * The pointer is stable as long as no further insert touches the table. + */ +extern void *agehash_lookup(AgeHashTable *t, const void *key); + +/* Variant accepting a precomputed hash value. */ +extern void *agehash_lookup_with_hash(AgeHashTable *t, const void *key, + uint32 hashvalue); + +/* + * Freeze the table: subsequent insert/grow attempts are an Assert failure + * in DEBUG and an elog(ERROR) in production. This is the contract that + * lets read-only-after-build callers hand out long-lived payload pointers. + */ +extern void agehash_freeze(AgeHashTable *t); + +/* True after agehash_freeze(); useful for caller-side asserts. */ +extern bool agehash_is_frozen(const AgeHashTable *t); + +/* Live entry count. */ +extern uint32 agehash_size(const AgeHashTable *t); + +/* Allocated slot count (capacity). */ +extern uint32 agehash_capacity(const AgeHashTable *t); + +/* + * Iteration. Usage: + * + * AgeHashIter it; + * for (agehash_iter_init(t, &it); agehash_iter_next(&it); ) + * { + * graphid k = *(graphid *) it.key; + * my_payload *p = it.payload; + * ... + * } + * + * Iteration order is unspecified. Modifying the table during iteration is + * undefined behaviour. + */ +typedef struct AgeHashIter +{ + AgeHashTable *t; + uint32 idx; + void *key; + void *payload; +} AgeHashIter; + +extern void agehash_iter_init(AgeHashTable *t, AgeHashIter *it); +extern bool agehash_iter_next(AgeHashIter *it); + +/* + * Internal self-test. Returns a NUL-terminated diagnostic string allocated + * in CurrentMemoryContext: "OK" on success, "FAIL: " on failure. + * Used by the agehash regression test. + */ +extern const char *agehash_self_test(void); + +#endif /* AG_AGEHASH_H */