MySQL单表千万级数据处理的思路分享

日前笔者需要处理MySQL单表千万级的电子元器件数据,进行数据归类, 数据清洗以及器件参数处理,进而得出国产器件与国外器件的替换兼容性数据,为电子工程师寻找国产替换件提供参考。

项目背景

在处理过程中,今天上午需要更新A字段,下午爬虫组完成了规格书或图片的爬取又需要更新图片和规格书字段,由于单表千万级深度翻页会导致处理速度越来越慢。

?

1

select a,b,c from db.tb limit 10000 offset 9000000

但是时间是有限的,是否有更好的方法去解决这种问题呢?

改进思路

是否有可以不需要深度翻页也可以进行数据更新的凭据?
是的,利用自增id列

观察数据特征

此单表有自增id列且为主键,根据索引列查询数据和更新数据是最理想的途径。

?

1

2

select a,b, c from db.tb where id=9999999;

update db.tb set a=x where id=9999999;

多进程处理

每个进程处理一定id范围内的数据,这样既避免的深度翻页又可以同时多进程处理数据。
提高数据查询速度的同时也提高了数据处理速度。
下面是我编写的任务分配函数,供参考:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

def mission_handler(all_missions, worker_mission_size):

"""

根据总任务数和每个worker的任务数计算出任务列表, 任务列表元素为(任务开始id, 任务结束id)。

例: 总任务数100个,每个worker的任务数40, 那么任务列表为:[(1, 40), (41, 80), (81, 100)]

:param all_missions: 总任务数

:param worker_mission_size: 每个worker的最大任务数

:return: [(start_id, end_id), (start_id, end_id), …]

"""

worker_mission_ids = []

current_id = 0

while current_id <= all_missions:

start_id = all_missions if current_id + 1 >= all_missions else current_id + 1

end_id = all_missions if current_id + worker_mission_size >= all_missions else current_id + worker_mission_size

if start_id == end_id:

if worker_mission_ids[-1][1] == start_id:

break

worker_mission_ids.append((start_id, end_id))

current_id += worker_mission_size

return worker_mission_ids

假设单表id最大值为100, 然后我们希望每个进程处理20个id,那么任务列表将为:

?

1

2

>>> mission_handler(100, 40)

[(1, 40), (41, 80), (81, 100)]

那么,
进程1将只需要处理id between 1 to 40的数据;
进程2将只需要处理id between 41 to 80的数据;
进程3将只需要处理id between 81 to 100的数据。

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

from concurrent.futures import ProcessPoolExecutor

def main():

# 自增id最大值

max_id = 30000000

# 单worker处理数据量

worker_mission_size = 1000000

# 使用多进程进行处理

missions = mission_handler(max_id, worker_mission_size)

workers = []

executor = ProcessPoolExecutor()

for idx, mission in enumerate(missions):

start_id, end_id = mission

workers.append(executor.submit(data_handler, start_id, end_id, idx))

def data_handler(start_id, end_id, worker_id):

pass

思路总结

  • 避免深度翻页进而使用自增id进行查询数据和数据
  • 使用多进程处理数据
  • 数据处理技巧

    记录处理成功与处理失败的数据id,以便后续跟进处理

    ?

    1

    2

    # 用另外一张表记录处理状态

    insert into db.tb_handle_status(row_id, success) values (999, 0);

    循环体内进行异常捕获,避免程序异常退出

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    21

    22

    23

    24

    25

    26

    def data_handler(start_id, end_id, worker_id):

    # 数据连接

    conn, cursor = mysql()

    current_id = start_id

    try:

    while current_id <= end_id:

    try:

    # TODO 数据处理代码

    pass

    except Exception as e:

    # TODO 记录处理结果

    # 数据移动到下一条

    current_id += 1

    continue

    else:

    # 无异常,继续处理下一条数据

    current_id += 1

    except Exception as e:

    return 'worker_id({}): result({})'.format(worker_id, False)

    finally:

    # 数据库资源释放

    cursor.close()

    conn.close()

    return 'worker_id({}): result({})'.format(worker_id, True)

    更新数据库数据尽量使用批量提交

    ?

    1

    2

    3

    4

    5

    6

    7

    8

    sql = """update db.tb set a=%s, b=%s where id=%s"""

    values = [

    ('a_value', 'b_value', 9999),

    ('a_value', 'b_value', 9998),

    ]

    # 批量提交,减少网络io以及锁获取频率

    cursor.executemany(sql, values)

    以上就是MySQL单表千万级数据处理的思路分享的详细内容,更多关于MySQL单表千万级数据处理的资料请关注钦钦技术栈其它相关文章!

    原文链接:https://juejin.cn/post/6969527649995587615

    版权声明:本文(即:原文链接:https://www.qin1qin.com/catagory/7055/)内容由互联网用户自发投稿贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 630367839@qq.com 举报,一经查实,本站将立刻删除。

    (0)
    上一篇 2022年 7月 31日 3:31:59
    下一篇 2022年 7月 31日 3:32:09

    软件定制开发公司

    相关阅读

    发表回复

    登录后才能评论
    通知:禁止投稿所有关于虚拟货币,币圈类相关文章,发现立即永久封锁账户ID!