在Flink中,我們可以很方便地讀取MySQL數據庫的數據,并將其寫回到MySQL中。下面就是一個簡單的例子,演示如何讀取MySQL數據并寫回到MySQL中。
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.jdbc.JDBCInputFormat; import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.types.Row; import org.apache.flink.util.Collector; import java.sql.Types; public class MySQLReaderWriter { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 設置參數 final String[] fieldNames = {"id", "name", "age"}; final TypeInformation>[] fieldTypes = {Types.INT, Types.STRING, Types.INT}; final JDBCInputFormat inputFormat = JDBCInputFormat.buildJDBCInputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/flink") .setUsername("root") .setPassword("password") .setQuery("SELECT * FROM users WHERE age >25") .setRowTypeInfo(new RowTypeInfo(fieldTypes, fieldNames)) .finish(); DataSetresultSet = env.createInput(inputFormat); // 映射結果 DataSet
>result = resultSet.map(new MapFunction >() { @Override public Tuple2
map(Row row) throws Exception { int id = (int) row.getField(0); String name = (String) row.getField(1); return new Tuple2<>(id, name); } }); // 寫回到MySQL final JDBCOutputFormat outputFormat = JDBCOutputFormat.buildJDBCOutputFormat() .setDrivername("com.mysql.jdbc.Driver") .setDBUrl("jdbc:mysql://localhost:3306/flink") .setUsername("root") .setPassword("password") .setQuery("INSERT INTO result(id, name) VALUES (?, ?)") .setSqlTypes(new int[] {Types.INTEGER, Types.VARCHAR}) .finish(); result.flatMap(new FlatMapFunction , Row>() { @Override public void flatMap(Tuple2 input, Collector output) { Row row = new Row(2); row.setField(0, input.f0); row.setField(1, input.f1); output.collect(row); } }).output(outputFormat); env.execute(); } }
以上代碼中,我們使用了JDBCInputFormat和JDBCOutputFormat來讀取和寫回MySQL數據,同時還涉及了數據類型的轉換。通過這個例子,我們可以很容易地將Flink與MySQL集成,并進行數據處理和寫回操作。