Skip to content

feat: Create and alias index #813

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions om/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package om

import (
"context"
"fmt"
"github.com/oklog/ulid/v2"
"reflect"
"strconv"
"strings"
"time"

"github.com/redis/rueidis"
Expand Down Expand Up @@ -146,6 +148,77 @@ func (r *HashRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnHash().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
}

// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
func (r *HashRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
alias := r.idx

var currentIndex string
aliasExists := false
infoCmd := r.client.B().FtInfo().Index(alias).Build()
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
if err != nil {
if strings.Contains(err.Error(), "Unknown index name") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should work @rueian

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. please write tests for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, will do the changes in JSON as well and then write UTs

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @SoulPancake, any update on this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rueian Sorry been busy with work, Will do this tomorrow

// This is expected when the alias doesn't exist yet
aliasExists = false
} else {
// This is an unexpected error (network, connection, etc.)
return fmt.Errorf("failed to check if index exists: %w", err)
}
} else {
aliasExists = true
}

if aliasExists {
message, ok := infoResp["index_name"]
if !ok {
return fmt.Errorf("index_name not found in FT.INFO response")
}

currentIndex, err = message.ToString()
if err != nil {
return fmt.Errorf("failed to convert index_name to string: %w", err)
}
}

newIndex := alias + "_v1"
if aliasExists && currentIndex != "" {
// Find the last occurrence of "_v" followed by digits
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about this? @rueian

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Please update for the JSONRepository as well.

lastVersionIndex := strings.LastIndex(currentIndex, "_v")
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
versionStr := currentIndex[lastVersionIndex+2:]
if version, err := strconv.Atoi(versionStr); err == nil {
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
}
}
}

// Create the new index
if err := r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(newIndex).OnHash().Prefix(1).Prefix(r.prefix+":").Schema())).Error(); err != nil {
return err
}

// Update or add the alias
var aliasErr error
if aliasExists {
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
} else {
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
}

if aliasErr != nil {
return fmt.Errorf("failed to update alias: %w", aliasErr)
}

// Drop the old index if it exists and differs from the new one
if aliasExists && currentIndex != "" && currentIndex != newIndex {
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
return fmt.Errorf("failed to drop old index: %w", err)
}
}

return nil
}

// DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `hashidx:{prefix}`
func (r *HashRepository[T]) DropIndex(ctx context.Context) error {
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()
Expand Down
49 changes: 49 additions & 0 deletions om/hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,3 +479,52 @@ func TestNewHashRepositoryTTL(t *testing.T) {
}
})
}

// TestCreateAndAliasIndex tests the CreateAndAliasIndex method of HashRepository.
func TestCreateAndAliasIndex(t *testing.T) {
ctx := context.Background()

client := setup(t)
client.Do(ctx, client.B().Flushall().Build())
defer client.Close()

repo := NewHashRepository("hashalias", HashTestStruct{}, client)

t.Run("CreateAndAliasIndex", func(t *testing.T) {
err := repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
return schema.FieldName("Val").Text().Build()
})
if err != nil {
t.Fatalf("failed to create and alias index: %v", err)
}

// Verify the alias points to the correct index
infoCmd := client.B().FtInfo().Index(repo.IndexName()).Build()
infoResp, err := client.Do(ctx, infoCmd).ToMap()
if err != nil {
t.Fatalf("failed to fetch index info: %v", err)
}

if _, ok := infoResp["index_name"]; !ok {
t.Fatalf("index_name not found in FT.INFO response")
}

// Create a new version of the index and alias it
err = repo.CreateAndAliasIndex(ctx, func(schema FtCreateSchema) rueidis.Completed {
return schema.FieldName("Val").Text().Build()
})
if err != nil {
t.Fatalf("failed to create and alias new index version: %v", err)
}

// Verify the alias points to the new index
infoResp, err = client.Do(ctx, infoCmd).ToMap()
if err != nil {
t.Fatalf("failed to fetch updated index info: %v", err)
}

if _, ok := infoResp["index_name"]; !ok {
t.Fatalf("index_name not found in updated FT.INFO response")
}
})
}
69 changes: 69 additions & 0 deletions om/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package om
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
Expand Down Expand Up @@ -145,6 +146,74 @@ func (r *JSONRepository[T]) CreateIndex(ctx context.Context, cmdFn func(schema F
return r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(r.idx).OnJson().Prefix(1).Prefix(r.prefix+":").Schema())).Error()
}

// CreateAndAliasIndex creates a new index, aliases it, and drops the old index if needed.
func (r *JSONRepository[T]) CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error {
alias := r.idx

var currentIndex string
aliasExists := false
infoCmd := r.client.B().FtInfo().Index(alias).Build()
infoResp, err := r.client.Do(ctx, infoCmd).ToMap()
if err != nil {
if strings.Contains(err.Error(), "Unknown index name") {
aliasExists = false
} else {
return fmt.Errorf("failed to check if index exists: %w", err)
}
} else {
aliasExists = true
}

if aliasExists {
message, ok := infoResp["index_name"]
if !ok {
return fmt.Errorf("index_name not found in FT.INFO response")
}

currentIndex, err = message.ToString()
if err != nil {
return fmt.Errorf("failed to convert index_name to string: %w", err)
}
}

newIndex := alias + "_v1"
if aliasExists && currentIndex != "" {
lastVersionIndex := strings.LastIndex(currentIndex, "_v")
if lastVersionIndex != -1 && lastVersionIndex+2 < len(currentIndex) {
versionStr := currentIndex[lastVersionIndex+2:]
if version, err := strconv.Atoi(versionStr); err == nil {
newIndex = fmt.Sprintf("%s_v%d", alias, version+1)
}
}
}

// Create the new index
if err := r.client.Do(ctx, cmdFn(r.client.B().FtCreate().Index(newIndex).OnJson().Prefix(1).Prefix(r.prefix+":").Schema())).Error(); err != nil {
return err
}

// Update or add the alias
var aliasErr error
if aliasExists {
aliasErr = r.client.Do(ctx, r.client.B().FtAliasupdate().Alias(alias).Index(newIndex).Build()).Error()
} else {
aliasErr = r.client.Do(ctx, r.client.B().FtAliasadd().Alias(alias).Index(newIndex).Build()).Error()
}

if aliasErr != nil {
return fmt.Errorf("failed to update alias: %w", aliasErr)
}

// Drop the old index if it exists and differs from the new one
if aliasExists && currentIndex != "" && currentIndex != newIndex {
if err := r.client.Do(ctx, r.client.B().FtDropindex().Index(currentIndex).Build()).Error(); err != nil {
return fmt.Errorf("failed to drop old index: %w", err)
}
}

return nil
}

// DropIndex uses FT.DROPINDEX from the RediSearch module to drop index whose name is `jsonidx:{prefix}`
func (r *JSONRepository[T]) DropIndex(ctx context.Context) error {
return r.client.Do(ctx, r.client.B().FtDropindex().Index(r.idx).Build()).Error()
Expand Down
1 change: 1 addition & 0 deletions om/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Repository[T any] interface {
SaveMulti(ctx context.Context, entity ...*T) (errs []error)
Remove(ctx context.Context, id string) error
CreateIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error
CreateAndAliasIndex(ctx context.Context, cmdFn func(schema FtCreateSchema) rueidis.Completed) error
AlterIndex(ctx context.Context, cmdFn func(alter FtAlterIndex) rueidis.Completed) error
DropIndex(ctx context.Context) error
IndexName() string
Expand Down
Loading