在進行Mysql分庫分表后,我們很可能需要將數據同步到ES中。下面給出幾種同步的方式。
1. 使用ES Rivers插件
{ "type": "jdbc", "jdbc": { "url": "jdbc:mysql://localhost:3306/myDb", "user": "myUser", "password": "myPassword", "sql": "select * from myTable", "index": "myIndex", "type": "myType" } }
2. 使用Logstash
input { jdbc { jdbc_driver_library =>"/path/to/mysql-connector-java-5.1.34-bin.jar" jdbc_driver_class =>"com.mysql.jdbc.Driver" jdbc_connection_string =>"jdbc:mysql://localhost:3306/myDb" jdbc_user =>"myUser" jdbc_password =>"myPassword" statement =>"SELECT * FROM myTable" } } output { elasticsearch { hosts =>["localhost:9200"] index =>"myIndex" document_type =>"myType" } }
3. 自定義同步程序
public class MySync { public static void main(String[] args) { // 獲取Mysql數據源 DataSource ds = new MysqlDataSource(); ((MysqlDataSource) ds).setUrl("jdbc:mysql://localhost:3306/myDb"); ((MysqlDataSource) ds).setUser("myUser"); ((MysqlDataSource) ds).setPassword("myPassword"); // 獲取ES客戶端 TransportClient client = new PreBuiltTransportClient(Settings.EMPTY) .addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300)); // 獲取Mysql數據 Connection conn = ds.getConnection(); Statement stmt = conn.createStatement(); ResultSet rs = stmt.executeQuery("SELECT * FROM myTable"); while (rs.next()) { // 填充ES數據 Mapdata = new HashMap<>(); data.put("field1", rs.getString("field1")); data.put("field2", rs.getString("field2")); data.put("field3", rs.getString("field3")); IndexResponse response = client.prepareIndex("myIndex", "myType") .setSource(data) .get(); } // 釋放資源 rs.close(); stmt.close(); conn.close(); client.close(); } }
上一篇go解析json數據