原创

elasticsearch学习二:导入数据

温馨提示:
本文最后更新于 2022年06月10日,已超过 243 天没有更新。若文章内的图片失效(无法正常加载),请留言反馈或直接联系我

安装go-elasticsearch,gorm

go get -u github.com/jinzhu/gorm
go get github.com/elastic/go-elasticsearch/v7

我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8

go-elasticsearch和gorm操作

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: \[\]string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map\[string\]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r\["version"\].(map\[string\]interface{})\["number"\])
   log.Println(strings.Repeat("~", 37))
   
   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}

elasticsearch导入

在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("\[%s\] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map\[string\]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("\[%s\] %s; version=%d", res.Status(), r\["result"\], int(r\["_version"\].(float64)))
      }
   }
}

总的代码:

package main

import (
   "bytes"
   "context"
   "encoding/json"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "github.com/elastic/go-elasticsearch/v7/esapi"
   "github.com/jinzhu/gorm"
   _ "github.com/jinzhu/gorm/dialects/mysql"
   "log"
   "strconv"
   "strings"
   "sync"
)

type LogInfo struct {
   Id               int    \`gorm:"primary_key"\`
   Type_name        string \`gorm:""\`
   Type             string \`gorm:""\`
   Sub_type         string \`gorm:""\`
   Sub\_type\_name    string \`gorm:""\`
   Time             string \`gorm:""\`
   Login_qq         string \`gorm:""\`
   Send_qq          string \`gorm:""\`
   Group            string \`gorm:""\`
   Content          string \`gorm:""\`
   Font_id          string \`gorm:""\`
   File             string \`gorm:""\`
   Being\_operate\_qq string \`gorm:""\`
   Add_time         string \`gorm:""\`
}

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: \[\]string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map\[string\]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r\["version"\].(map\[string\]interface{})\["number"\])
   log.Println(strings.Repeat("~", 37))

   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}

func main() {
   es := EsClient()
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   db := Db()
   defer db.Close()
   db.AutoMigrate(&LogInfo{})

   var list = &\[\]LogInfo{}
   lastId := 30308
start:
   db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list)
   var wg sync.WaitGroup
   for _, logInfo := range *list {
      wg.Add(1)
      go func(info LogInfo) {
         defer wg.Done()
         AddEsData(es, info)
      }(logInfo)
      lastId = logInfo.Id
   }
   wg.Wait()
   goto start
   //fmt.Println(list)
   //log.Println(res)
}

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("\[%s\] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map\[string\]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("\[%s\] %s; version=%d", res.Status(), r\["result"\], int(r\["_version"\].(float64)))
      }
   }
}
正文到此结束
本文目录