SteinKåreSkytteren

Migrating an Elasticsearch index

Categories: code
#elasticsearch #elastic4s #scala

Migrating an ElasticSearch index can be challenging. Updating mappings is important as they are the foundation for doing more than 'Hello World!' when doing search. ElasticSearch is quite good at guessing the type of a field, but it's just a start.

The UpdateIndex class uses Elastic4s as a scala DSL on top of the regular ElasticSearch Java API. Elastic4s is a lot closer to the ElasticSearch REST API than the regular API. It also handles moving data from the old index over to the new.

Code

 1 package bootstrap
 2 
 3 import scala.annotation.implicitNotFound
 4 import scala.concurrent.Await
 5 import scala.concurrent.ExecutionContext
 6 import scala.concurrent.Future
 7 import scala.concurrent.duration.DurationInt
 8 
 9 import org.elasticsearch.indices.IndexMissingException
10 import org.ladderframework.logging.Loggable
11 
12 import com.sksamuel.elastic4s.ElasticClient
13 import com.sksamuel.elastic4s.ElasticDsl.aliases
14 import com.sksamuel.elastic4s.ElasticDsl.create
15 import com.sksamuel.elastic4s.ElasticDsl.deleteIndex
16 import com.sksamuel.elastic4s.ElasticDsl.matchall
17 import com.sksamuel.elastic4s.ElasticDsl.search
18 import com.sksamuel.elastic4s.mappings.MappingDefinition
19 import com.sksamuel.elastic4s.mappings._
20 
21 class UpdateIndex(client: ElasticClient, indexName: String, types: Seq[MappingDefinition])(implicit ec: ExecutionContext){
22 
23  def info(msg: String){
24           println(msg) // You can do better :-)
25         }
26 
27  def initIndex(): Future[Unit] = {
28      info("init index")
29      client.execute{
30          search (indexName) query matchall
31      }.map(_.getHits.getTotalHits).recover{case _: IndexMissingException => 0}.flatMap(numberOfDocs => {
32          info("number of docs: " + numberOfDocs)
33          info("number of docs: " + numberOfDocs)
34              client.execute{
35                  info("create index")
36                  create index indexName mappings (types:_*)
37              }.map(_ => {})
38          } else {
39              val date = System.currentTimeMillis()
40              val newIndex = indexName + date
41              for{
42                  _ <- client.execute{
43                      info("create brand new index")
44                      create index newIndex mappings (types:_*)
45                  }
46                  _ <- {
47                      info("reindex")
48                      client.reindex(indexName, newIndex)
49                  }
50                  _ <- {
51                      client.execute(deleteIndex(indexName))
52                  }
53                  _ <- {
54                      info("create aliases")
55                      client.execute{aliases add indexName on newIndex}
56                  }
57                  newSt <- client.execute{search (indexName) query matchall}
58              } yield {
59                  info("done update index - Moved docs: " + newSt.getHits().getTotalHits())
60              }
61          }
62      })
63  }
64 
65  def createOrUpdateIndexes(): Unit = {
66      try{
67          val res = Await.result(initIndex(), 3 minutes)
68          info("DONE: " + res)
69      } catch{
70          case t: Throwable => t.printStackTrace()
71      }
72 
73  }
74 
75 }

Disclaimer

This code is used on a limited index on is it down.no but it workes great there.. So handle with care. Good feedback is appreciated.

comments powered by Disqus