elasticsearch学习二:导入数据

安装go-elasticsearch,gorm

go get -u github.com/jinzhu/gorm
go get github.com/elastic/go-elasticsearch/v7

我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8


go-elasticsearch和gorm操作

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: []string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map[string]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
   log.Println(strings.Repeat("~", 37))
   
   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}


elasticsearch导入

在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map[string]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
      }
   }
}


总的代码:

package main

import (
   "bytes"
   "context"
   "encoding/json"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "github.com/elastic/go-elasticsearch/v7/esapi"
   "github.com/jinzhu/gorm"
   _ "github.com/jinzhu/gorm/dialects/mysql"
   "log"
   "strconv"
   "strings"
   "sync"
)

type LogInfo struct {
   Id               int    `gorm:"primary_key"`
   Type_name        string `gorm:""`
   Type             string `gorm:""`
   Sub_type         string `gorm:""`
   Sub_type_name    string `gorm:""`
   Time             string `gorm:""`
   Login_qq         string `gorm:""`
   Send_qq          string `gorm:""`
   Group            string `gorm:""`
   Content          string `gorm:""`
   Font_id          string `gorm:""`
   File             string `gorm:""`
   Being_operate_qq string `gorm:""`
   Add_time         string `gorm:""`
}

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: []string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map[string]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
   log.Println(strings.Repeat("~", 37))

   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}

func main() {
   es := EsClient()
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   db := Db()
   defer db.Close()
   db.AutoMigrate(&LogInfo{})

   var list = &[]LogInfo{}
   lastId := 30308
start:
   db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list)
   var wg sync.WaitGroup
   for _, logInfo := range *list {
      wg.Add(1)
      go func(info LogInfo) {
         defer wg.Done()
         AddEsData(es, info)
      }(logInfo)
      lastId = logInfo.Id
   }
   wg.Wait()
   goto start
   //fmt.Println(list)
   //log.Println(res)
}

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map[string]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
      }
   }
}


仙士可博客
请先登录后发表评论
  • 最新评论
  • 总共0条评论
  • 本站由白俊遥博客程序搭建
    © 2017-1-17 php20.cn 版权所有 ICP证:闽ICP备17001387号
  • 本网站由: 提供cdn加速/云存储服务
  • 联系邮箱:1067197739@qq.com