【elasticsearch-9】sync mysql data 2 es

原创
09/10 15:27
阅读数 34

es.go

package main

import (
	"context"
	"github.com/olivere/elastic"
	"time"
)

var (
	esUrl  = "http://s10:19200"
	ctx    = context.Background()
	client *elastic.Client
)

func init() {
	var err error

	client, err = elastic.NewClient(
		elastic.SetSniff(false),
		elastic.SetURL(esUrl),
		elastic.SetHealthcheckInterval(10*time.Second),
		elastic.SetMaxRetries(5),
	)

	if err != nil {
		panic(err.Error())
	}
}

mysql.go

package main

import (
	_ "github.com/go-sql-driver/mysql"
	"github.com/go-xorm/xorm"
)

var (
	db *xorm.EngineGroup
)

func init() {
	conns := []string{"test:test@tcp(127.0.0.1:3306)/test"}

	var err error
	db, err = xorm.NewEngineGroup("mysql", conns)
	if err != nil {
		panic(err.Error())
	}

	db.ShowSQL(true)
	db.SetMaxIdleConns(5)
	db.SetMaxOpenConns(10)
}

model.go

package main

import (
	"encoding/json"
	"time"
)

type StudentAnswer struct {
	Id            int64     `json:"id"`
	Qid           int64     `json:"qid"`
	QuesType      int       `json:"ques_type"`
	ClassType     int       `json:"class_type"`
	LessonId      int64     `json:"lesson_id"`
	ScheduleId    int64     `json:"schedule_id"`
	IsFirstSubmit int       `json:"is_first_submit"`
	IsRight       int       `json:"is_right"`
	StuAnswer     string    `json:"stu_answer"`
	Scores        string    `json:"scores"`
	StudentId     int64     `json:"student_id"`
	CreatedAt     time.Time `json:"created_at"`
	UpdatedAt     time.Time `json:"updated_at"`
}

func (*StudentAnswer) TableName() string {
	return "student_answer"
}

func (sa *StudentAnswer) String() string {
	buf, _ := json.Marshal(sa)
	return string(buf)
}

main.go

package main

import (
	"fmt"
	"github.com/olivere/elastic"
	"googo.io/goo/log"
	"sync"
)

var (
	MaxId    = int64(0)
	pageSize = 1000
	index    = "stu-answer"
	wg       sync.WaitGroup
	ch       = make(chan int64, 1000)
	data     = make(chan []StudentAnswer)
)

func init() {
	// client.DeleteIndex(index).Do(ctx)
	// client.CreateIndex(index).Do(ctx)
}

func main() {
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			rows := getRows()
			fmt.Println(len(rows))
			if rows == nil || len(rows) == 0 {
				break
			}
			MaxId = rows[len(rows)-1].Id
			data <- rows
		}
	}()

	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			select {
			case rows := <-data:
				el := client.Bulk().Index(index)
				for _, row := range rows {
					el.Add(elastic.NewBulkIndexRequest().Id(fmt.Sprintf("%d", row.Id)).Doc(row))
				}
				if _, err := el.Do(ctx); err != nil {
					gooLog.Error(err.Error())
				}
			}
		}
	}()

	wg.Wait()
}

func getRows() []StudentAnswer {
	rows := []StudentAnswer{}
	err := db.Where("id > ?", MaxId).Limit(pageSize).Find(&rows)
	if err != nil {
		gooLog.Error(err.Error())
		return nil
	}
	return rows
}

test/main.go

package main

import (
	"encoding/json"
	"fmt"
	"github.com/olivere/elastic"
)

// SELECT COUNT(*) AS cnt FROM (SELECT id FROM `student_answer`
// WHERE lesson_id IN (633,63,635,636,665,668) AND is_first_submit = 1 AND class_type = 2
// GROUP BY `student_id`) AS a

var (
	index = "jy_edu_student_answer"
)

func main() {
	q := elastic.NewBoolQuery().Must(
		elastic.NewTermsQuery("lesson_id", 633, 634, 635, 636, 665, 668),
		elastic.NewMatchQuery("is_first_submit", "1"),
		elastic.NewMatchQuery("class_type", "2"),
	)

	agg := elastic.NewTermsAggregation().Field("student_id")

	cnt, err := client.Count().Index(index).Do(ctx)
	fmt.Println(cnt, err)

	cnt, err = client.Count().Index(index).Query(q).Do(ctx)
	fmt.Println(cnt, err)

	rst, _ := client.Search().Index(index).Aggregation("agg_student_id", agg).Query(q).Size(0).Do(ctx)
	buf, _ := json.Marshal(rst)
	fmt.Println(string(buf))
}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部