Square Dbsync
一种提取和加载系统分流数据库之间的数据。
它采用基于时间戳的复制是快速和易于保持运行,但有一些注意事项。最值得注意的是,它不处理以及删除(详情请参见下面的文档)。
这在广场,因为我们需要的部分(仅限于选择列),无论从MySQL和PostgreSQL数据库,连续复制到一个目标数据库与沿途的一些基本的ETL逻辑是对我们有用。现有的解决方案中没有能够充分地做到这一点。
在某些时候,你需要硬着头皮实现真正的ETL系统,但是SQ-dbsync
可你渡过难关,直到你到达那里。
dbsync是MySQL utf8mb4
干净的:它会正确处理四字节的UTF8字符像表情图案。在JRuby中,你需要配置到服务器的字符集utf8mb4
。该规格包括这个试验; 如果您在JRuby下具有不同的服务器字符集运行它们的MySQL他们就会失败。
用法
gem install sq-dbsync
require 'sq/dbsync'
include Sq::Dbsync
# Config will typically differ per environment.
config = {
sources: {
db_a: {
database: 'db_a_production',
user: 'sqdbsync-ro',
password: 'password',
host: 'db-a-host',
brand: 'mysql',
port: 3306,
},
db_b: {
database: 'db_b_production',
user: 'sqdbsync-ro',
password: 'password',
host: 'db-b-host',
brand: 'postgresl',
port: 5432,
}
},
target: {
database: 'replica',
user: 'sqdbsync',
password: 'password',
# Only localhost supported, since `LOAD DATA INFILE` is used which
# requires a shared temp directory.
host: 'localhost',
brand: 'mysql',
port: 3306,
},
# Optional configuration
logger: Loggers::Stream.new, # A graphite logger is provided, see source.
clock: ->{ Time.now.utc }, # In test env it can be useful to fix this.
error_handler: ->(e) { puts(e) } # Notify your exception system
}
# Write plans that specify how data is replicated.
DB_A_PLAN = [{
table_name: :users,
columns: [
# You must replicate the primary key.
:id,
# You must replicate a timestamp column, and it should be indexed on the
# target system.
:updated_at,
# Then whatever other columns you require.
:name,
:account_type,
:created_at,
],
indexes: {
# Indexing it on the source system is optional
index_users_on_updated_at: {:columns=>[:updated_at], :unique=>false},
},
# Basic schema transformations are supported.
db_types: {
:account_type => [:enum, %w(
bronze
silver
gold
)]
}
},{
table_name: :account_types,
source_table_name: :user_account_types,
columns: :all
}]
plans = [
[StaticTablePlan.new(DB_A_PLAN), :db_a],
[AllTablesPlan.new, :db_b]
]
manager = Manager.new(config, plans)
# Run a batch load nightly
manager.batch(ALL_TABLES)
# Run an incremental load continuously
manager.increment
# You can load a subset of tables if necessary
manager.batch([:users])
文档
计划选项
-
batch_load
是否批次负载该表中的默认批次负载。如果该表明确要求,将无论此设置加载。(默认:true) -
字符集
字符集,以创建表时使用。通过直接传递到 续集:: MySQL的数据库::#连接。只有MySQL的,忽视了Postgres的。(默认:“UTF8”) -
列
要么列的阵列复制,或:所有
指示所有列应该被复制。(需要) -
一致性
桌子上定期增量加载过程中,通过比较最近的源和目标表的计数执行基本的一致性检查。请确保您有两个表时间戳指数!开发该项目时,这是非常有用的,但老实说,现在可能是没有多大用处的---我不记得我最后一次看到错误来自于此。(默认值:false) -
db_types
哈希,使您可以从源修改目标模式。见上面用法部分的示例。(默认:{}
) -
索引
散列定义目标表所需的索引。索引是 不是从源表复制。见例如上面的章节。(默认:{}
) -
refresh_recent
一些表太大而批量负载经常,但修改已知是最近。此设置将导致数据的最后两天被丢弃的重建为夜间批处理负载的一部分。(默认值:false) -
source_table_name
允许源和目标表进行不同的命名。(默认:table_name的
配置选项) -
timestamp_table_name
一个黑客要解决Postgres的查询规划无法正确使用索引MAX()
上使用视图UNION
在幕后。如果这个描述你的源视图和基础表之一是保证包含您可以设置此值,最新的记录,它将被用于所有时间戳相关查询。如果没有,你必须提供一个支持自定义视图MAX
有一个健全的查询计划查询。(默认值:无) -
table_name的
要复制的表的名称。如果source_table_name
指定时,这个选项定义了在仅目标数据库中的表的名称。 -
primary_key
通常主键可以从源模式推断,但如果你是从视图复制,您需要使用此选项显式地指定它。应符号的阵列。(默认值:无,从源代码模式将自动检测) -
时间戳
来当作一个时间戳列。必须是会员:列
选项。(默认值:选择的updated_at
或created_at
,顺序)
处理删除
增量负荷没有检测删除的记录方式。夜间批处理负荷将重新加载所有的表,所以会有最多为期一天的周转上删除。有些表会过大,批量加载每天晚上然而,因此这不是在这种情况下很好的解决方案。
如果你有一个包含足够的数据,为您重建其他表中删除了“审计”表,那么你可以提供一个自定义子类增量加载器将能够运行这个逻辑。
class IncrementalLoadWithDeletes < Sq::Dbsync::IncrementalLoadAction
def process_deletes
if plan.table_name == :audit_logs
ExampleRecordDestroyer.run(db, registry, :audit_logs, :other_table)
end
end
end
CONFIG = {
# ...
incremental_action: IncrementalLoadWithDeletes,
}
看lib/sq/dbsync/example_record_destroyer的一个样本实现。
数据库设置
如果你的目标数据库是MySQL,我们建议您确保它的下运行READ COMMITTED
隔离级别。这使得它更难的分析师锁定表和块复制。(之类的语句CREATE TABLE AS SELECT FROM ...
往往是罪魁祸首。)
实例下载地址https://github.com/square/sq-dbsync/archive/master.zip