在數(shù)據(jù)分析和數(shù)據(jù)挖掘的過(guò)程中,為了方便地從多個(gè)數(shù)據(jù)源中獲取和整合數(shù)據(jù),我們通常需要在一定時(shí)間間隔內(nèi)從數(shù)據(jù)源(例如ERP、CRM系統(tǒng)等)定時(shí)推送數(shù)據(jù)到中間庫(kù)(例如Mysql、PostgreSQL等)。本文介紹如何使用Mysql實(shí)現(xiàn)定時(shí)推送數(shù)據(jù)到中間庫(kù)。
首先,我們需要在Mysql中創(chuàng)建一個(gè)用于保存數(shù)據(jù)的表,例如:
CREATE TABLE `my_data_table` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) COLLATE utf8mb4_unicode_ci DEFAULT NULL, `age` int(11) DEFAULT NULL, `gender` varchar(10) COLLATE utf8mb4_unicode_ci DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
接著,我們需要編寫(xiě)一個(gè)用于將數(shù)據(jù)從數(shù)據(jù)源中查詢(xún)出來(lái)并插入到Mysql表中的腳本。這里以Python為例,使用pymysql庫(kù)連接數(shù)據(jù)源和Mysql,并編寫(xiě)以下代碼:
import pymysql # 連接數(shù)據(jù)源和Mysql source_db = pymysql.connect( host='source_host', user='source_username', password='source_password', database='source_database', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) target_db = pymysql.connect( host='target_host', user='target_username', password='target_password', database='target_database', charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor ) // 查詢(xún)數(shù)據(jù)源 source_cursor = source_db.cursor() source_sql = "SELECT name, age, gender FROM my_source_table" source_cursor.execute(source_sql) results = source_cursor.fetchall() # 插入到Mysql target_cursor = target_db.cursor() for result in results: target_sql = "INSERT INTO my_data_table (name, age, gender) VALUES (%s, %s, %s)" target_cursor.execute(target_sql, (result['name'], result['age'], result['gender'])) target_db.commit() source_cursor.close() source_db.close() target_cursor.close() target_db.close()
最后,我們需要將以上腳本以定時(shí)任務(wù)的形式運(yùn)行。可以使用Linux的crontab,也可以使用Python的schedule庫(kù)。以下為使用Python的schedule庫(kù)實(shí)現(xiàn):
import schedule import time def job(): // 執(zhí)行之前編寫(xiě)的查詢(xún)數(shù)據(jù)源并插入到Mysql的腳本 schedule.every(3).hours.do(job) while True: schedule.run_pending() time.sleep(1)
以上代碼表示每隔3個(gè)小時(shí)執(zhí)行一次job函數(shù)。可以根據(jù)實(shí)際需求來(lái)進(jìn)行修改。