elasticsearch学习二:导入数据
温馨提示:
本文最后更新于 2022年06月10日,已超过 832 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我。
安装go-elasticsearch,gorm
go get -u github.com/jinzhu/gorm
go get github.com/elastic/go-elasticsearch/v7
我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8
go-elasticsearch和gorm操作
func EsClient() *elasticsearch.Client {
cfg := elasticsearch.Config{
Addresses: []string{
"http://127.0.0.1:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// Get cluster info
//
var r map[string]interface{}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
log.Printf("Client: %s", elasticsearch.Version)
log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
log.Println(strings.Repeat("~", 37))
return es
}
func Db() *gorm.DB {
db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
panic(err)
}
return db
}
elasticsearch导入
在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type
func AddEsData(es *elasticsearch.Client, info LogInfo) {
id := info.Id
// Build the request body.
data, err := json.Marshal(info)
if err != nil {
log.Fatalf("Error marshaling document: %s", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: "test",
DocumentID: strconv.Itoa(id),
Body: bytes.NewReader(data),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
fmt.Println(res)
log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
//log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}
总的代码:
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
"log"
"strconv"
"strings"
"sync"
)
type LogInfo struct {
Id int \`gorm:"primary_key"\`
Type_name string \`gorm:""\`
Type string \`gorm:""\`
Sub_type string \`gorm:""\`
Sub_type_name string \`gorm:""\`
Time string \`gorm:""\`
Login_qq string \`gorm:""\`
Send_qq string \`gorm:""\`
Group string \`gorm:""\`
Content string \`gorm:""\`
Font_id string \`gorm:""\`
File string \`gorm:""\`
Being_operate_qq string \`gorm:""\`
Add_time string \`gorm:""\`
}
func EsClient() *elasticsearch.Client {
cfg := elasticsearch.Config{
Addresses: []string{
"http://127.0.0.1:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// Get cluster info
//
var r map[string]interface{}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
log.Printf("Client: %s", elasticsearch.Version)
log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
log.Println(strings.Repeat("~", 37))
return es
}
func Db() *gorm.DB {
db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
panic(err)
}
return db
}
func main() {
es := EsClient()
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
db := Db()
defer db.Close()
db.AutoMigrate(&LogInfo{})
var list = &[]LogInfo{}
lastId := 30308
start:
db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list)
var wg sync.WaitGroup
for _, logInfo := range *list {
wg.Add(1)
go func(info LogInfo) {
defer wg.Done()
AddEsData(es, info)
}(logInfo)
lastId = logInfo.Id
}
wg.Wait()
goto start
//fmt.Println(list)
//log.Println(res)
}
func AddEsData(es *elasticsearch.Client, info LogInfo) {
id := info.Id
// Build the request body.
data, err := json.Marshal(info)
if err != nil {
log.Fatalf("Error marshaling document: %s", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: "test",
DocumentID: strconv.Itoa(id),
Body: bytes.NewReader(data),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
fmt.Println(res)
log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
//log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}
正文到此结束
- 本文标签: es 编程语言
- 本文链接: https://www.php20.cn/article/374
- 版权声明: 本文由仙士可原创发布,转载请遵循《署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0)》许可协议授权