-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample.go
More file actions
75 lines (62 loc) · 1.82 KB
/
example.go
File metadata and controls
75 lines (62 loc) · 1.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main
import (
"context"
"database/sql"
"fmt"
"github.com/chararch/gobatch"
"github.com/chararch/gobatch/util"
_ "github.com/go-sql-driver/mysql"
"time"
)
// simple task
func mytask() {
fmt.Println("mytask executed")
}
//reader
type myReader struct {
}
func (r *myReader) Read(chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
curr, _ := chunkCtx.StepExecution.StepContext.GetInt("read.num", 0)
if curr < 100 {
chunkCtx.StepExecution.StepContext.Put("read.num", curr+1)
return fmt.Sprintf("value-%v", curr), nil
}
return nil, nil
}
//processor
type myProcessor struct {
}
func (r *myProcessor) Process(item interface{}, chunkCtx *gobatch.ChunkContext) (interface{}, gobatch.BatchError) {
return fmt.Sprintf("processed-%v", item), nil
}
//writer
type myWriter struct {
}
func (r *myWriter) Write(items []interface{}, chunkCtx *gobatch.ChunkContext) gobatch.BatchError {
fmt.Printf("write: %v\n", items)
return nil
}
func main() {
//set db for gobatch to store job&step execution context
var db *sql.DB
var err error
db, err = sql.Open("mysql", "root:root123@tcp(127.0.0.1:3306)/gobatch?charset=utf8&parseTime=true")
if err != nil {
panic(err)
}
gobatch.SetDB(db)
//build steps
step1 := gobatch.NewStep("mytask").Handler(mytask).Build()
//step2 := gobatch.NewStep("my_step").Handler(&myReader{}, &myProcessor{}, &myWriter{}).Build()
step2 := gobatch.NewStep("my_step").Reader(&myReader{}).Processor(&myProcessor{}).Writer(&myWriter{}).ChunkSize(10).Build()
//build job
job := gobatch.NewJob("my_job").Step(step1, step2).Build()
//register job to gobatch
gobatch.Register(job)
//run
//gobatch.StartAsync(context.Background(), job.Name(), "")
params, _ := util.JsonString(map[string]interface{}{
"rand": time.Now().Nanosecond(),
})
gobatch.Start(context.Background(), job.Name(), params)
}