SteinKåreSkytteren

Migrating an Elasticsearch index

Categories: Code
#Elasticsearch #Elastic4s #Scala

Migrating an ElasticSearch index can be challenging. Updating mappings is important as they are the foundation for doing more than ‘Hello World!’ when doing search. ElasticSearch is quite good at guessing the type of a field, but it’s just a start.

The UpdateIndex class uses Elastic4s as a scala DSL on top of the regular ElasticSearch Java API. Elastic4s is a lot closer to the ElasticSearch REST API than the regular API. It also handles moving data from the old index over to the new.

Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package bootstrap

import scala.annotation.implicitNotFound
import scala.concurrent.Await
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import org.elasticsearch.indices.IndexMissingException
import org.ladderframework.logging.Loggable

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl.aliases
import com.sksamuel.elastic4s.ElasticDsl.create
import com.sksamuel.elastic4s.ElasticDsl.deleteIndex
import com.sksamuel.elastic4s.ElasticDsl.matchall
import com.sksamuel.elastic4s.ElasticDsl.search
import com.sksamuel.elastic4s.mappings.MappingDefinition
import com.sksamuel.elastic4s.mappings._

class UpdateIndex(client: ElasticClient, indexName: String, types: Seq[MappingDefinition])(implicit ec: ExecutionContext){

	def info(msg: String){
          println(msg) // You can do better :-)
        }

	def initIndex(): Future[Unit] = {
		info("init index")
		client.execute{
			search (indexName) query matchall
		}.map(_.getHits.getTotalHits).recover{case _: IndexMissingException => 0}.flatMap(numberOfDocs => {
			info("number of docs: " + numberOfDocs)
			info("number of docs: " + numberOfDocs)
				client.execute{
					info("create index")
					create index indexName mappings (types:_*)
				}.map(_ => {})
			} else {
				val date = System.currentTimeMillis()
				val newIndex = indexName + date
				for{
					_ <- client.execute{
						info("create brand new index")
						create index newIndex mappings (types:_*)
					}
					_ <- {
						info("reindex")
						client.reindex(indexName, newIndex)
					}
					_ <- {
						client.execute(deleteIndex(indexName))
					}
					_ <- {
						info("create aliases")
						client.execute{aliases add indexName on newIndex}
					}
					newSt <- client.execute{search (indexName) query matchall}
				} yield {
					info("done update index - Moved docs: " + newSt.getHits().getTotalHits())
				}
			}
		})
	}

	def createOrUpdateIndexes(): Unit = {
		try{
			val res = Await.result(initIndex(), 3 minutes)
			info("DONE: " + res)
		} catch{
			case t: Throwable => t.printStackTrace()
		}

	}

}

Disclaimer

This code is used on a limited index on is it down.no but it workes great there.. So handle with care. Good feedback is appreciated.