使 PostgreSQL 滴答运行:pg_cron 中的新特性

2023/11/22 15:51
阅读数 22

原文链接:

https://www.citusdata.com/blog/2023/10/26/making-postgres-tick-new-features-in-pg-cron/

作者:Marco Slot
2023年10月26日

pg_cron 是一个开源的PostgreSQL扩展,提供了基于 cron 的调度器,用于定期运行 SQL 命令。几乎每个管理 PostgreSQL 的服务都支持 pg_cron,并且它已经成为许多 PostgreSQL 用户的标准工具。自从我在Citus全职工作以来,pg_cron 一直是我的副业项目,因此我试图使其结构简单、可靠且易于维护。当然了,随着用户数量的增加,功能需求列表也在不断延长。在 Postgres 社区的帮助下,pg_cron 随着时间的推移越来越强大。

我们最近在 1.6 版本中增加了对 PostgreSQL 16 的支持,但可能过去一年中在 pg_cron 中添加的最令人兴奋的功能(在 1.5 版本中)是能够每隔几秒钟安排一项任务。我曾经对这个功能的想法持保留态度,因为(a)这不是常规 cron 能做的事情;并且(b)如果每隔几秒钟就发生一次,pg_cron 中的任何问题都会变得更为严重。然而,到目前为止,pg_cron 已经经过了相当充分的实战测试,而且按秒计划的任务已经成为迄今为止最受欢迎的 pg_cron 功能需求。

每隔几秒钟安排一项任务

能够执行按秒计划的任务使你能够快速响应数据库中的新事件。一些示例用例包括:

  • 更新实时洞察的聚合

  • 检测异常(例如:同一 IP 的多次请求)

  • 轮询外部源(例如:频繁地从远程服务器同步)

  • 实现更为复杂的任务调度工作流

自 pg_cron 1.5 版本起,你可以轻松安排每 1-59 秒执行的任务:

  
  
  
  1. --每10秒调用我的过程

  2. SELECT cron.schedule('call-my-agent','10 seconds','call my_agent()')

不允许间隔时间超过 59 秒的原因是,现有的 cron 计划已经允许每分钟运行一次作业,而且这种逻辑更可靠地处理时钟跳跃。不允许更低的间隔时间(例如,毫秒)的原因是,这是一种可能导致问题的不同类型的工作负载。因此,1-59 看起来是一个低维护、关键任务项目的安全范围。

提示:请注意,每个作业运行仍然默认记录在 cron.job_run_details 中,几个月后,每几秒运行一次作业,这可能会变得非常大。如果你预计会有非常高的数量,你可以选择禁用 cron.log_run 设置。建议你至少设置一个 pg_cron 作业来清理 pg_cron 后的内容:

  
  
  
  1. --每天中午删除当前用户的旧 cron.job_run_details 记录

  2. SELECT cron.schedule('delete-job-run-details','0 12 * * *', $$DELETE FROM cron.job_run_details WHERE end_time < now()- interval '3 days'$$);

在 PostgreSQL 中可扩展的并行任务队列执行器

秒级粒度的调度使你能够使用 pg_cron 作为基础调度原语,在其基础上构建更为复杂的调度器,而无需修改 pg_cron 本身。

pg_cron 用户的一个常见请求是能够调度一次性命令,这对于将大任务移到后台或一次安排多个独立操作很有帮助。例如,你可能想从另一个系统中加载数据批次,应用转换,对许多不同的表执行操作等等。然而,这也带来了许多围绕失败处理的问题,而 pg_cron 本不是为解决这些问题而设计的。相反,你可以在 pg_cron 的基础上构建这样的基础设施。

下面,我们提供了一个基本的(公有领域)实现,用于在 pg_cron 之上的 PL/pgSQL 中执行一次性工作的任务队列执行器:

  
  
  
  1. --用于跟踪要立即执行的作业的表

  2. CREATE TABLE job_queue (

  3. jobid bigserial primary key,

  4. command text notnull,

  5. search_path text notnulldefault'pg_catalog',

  6. attempts intnotnulldefault0,

  7. max_attempts intnotnulldefault5,

  8. last_attempt timestamptz,

  9. last_error text

  10. );

  11. --用于跟踪作业失败的表

  12. CREATE TABLE job_errors (

  13. jobid bigint notnull,

  14. command text notnull,

  15. message text notnull,

  16. start_time timestamptz notnull,

  17. end_time timestamptz notnull

  18. );

  19. CREATE OR REPLACE FUNCTION schedule_once(p_command text)

  20. RETURNS void LANGUAGE plpgsql AS $fn$

  21. BEGIN

  22. INSERT INTO job_queue (command, search_path)

  23. VALUES (p_command, current_setting('search_path'));

  24. END; $fn$;

  25. CREATE OR REPLACE PROCEDURE run_jobs()

  26. LANGUAGE plpgsql AS $fn$

  27. DECLARE

  28. v_ctid tid;

  29. v_jobid bigint;

  30. v_command text;

  31. v_search_path text;

  32. v_message text;

  33. v_success bool;

  34. v_attempts int;

  35. v_max_attempts int;

  36. v_start_time timestamptz;

  37. v_end_time timestamptz;

  38. BEGIN

  39. LOOP

  40. --从队列中获取作业

  41. SELECT ctid, jobid, command, search_path, attempts +1, max_attempts

  42. INTO v_ctid, v_jobid, v_command, v_search_path, v_attempts, v_max_attempts

  43. FROM job_queue

  44. WHERE last_attempt isnull OR last_attempt < now()- interval '10 seconds'

  45. LIMIT 1 FOR UPDATE SKIP LOCKED;

  46. IF NOT FOUND THEN

  47. --找不到作业,退出,但很快会恢复

  48. EXIT;

  49. END IF;

  50. v_start_time := now();

  51. BEGIN

  52. --执行命令

  53. SET LOCAL search_path TO v_search_path;

  54. EXECUTE v_command;

  55. RESET search_path;

  56. v_message :='Success';

  57. v_success :=true;

  58. EXCEPTION WHEN others THEN

  59. --命令失败,记录并存储错误消息

  60. RAISE WARNING 'scheduled job failed: %', SQLERRM;

  61. v_message := SQLERRM;

  62. v_success :=false;

  63. END;

  64. v_end_time := now();

  65. IF v_success OR v_attempts >= v_max_attempts THEN

  66. --如果成功或我们尝试次数超过最大尝试次数,则删除作业

  67. DELETE FROM job_queue WHERE ctid = v_ctid;

  68. IF NOT v_success THEN

  69. --目前我们只在出错的情况下记录,以减少冗余插入

  70. INSERT INTO job_errors (jobid, command, message, start_time, end_time)

  71. VALUES (v_jobid, v_command, v_message, v_start_time, now());

  72. END IF;

  73. ELSE

  74. --更新尝试次数并稍后重试

  75. UPDATE job_queue

  76. SET attempts = v_attempts, last_attempt = now(), last_error = v_message

  77. WHERE ctid = v_ctid;

  78. END IF;

  79. COMMIT;

  80. END LOOP;

  81. END; $fn$;

  82. --通过pg_cron并行运行多达4个作业

  83. SELECT cron.schedule('job-runner-1','5 seconds','call run_jobs()');

  84. SELECT cron.schedule('job-runner-2','5 seconds','call run_jobs()');

  85. SELECT cron.schedule('job-runner-3','5 seconds','call run_jobs()');

  86. SELECT cron.schedule('job-runner-4','5 seconds','call run_jobs()');

在设置了任务队列后,现在你可以安排一次性的任务,这些任务通常会在 5 秒内开始,并且即使你断开连接,它们也会完成执行:

  
  
  
  1. --在后台启动长时间运行的作业:

  2. SELECT schedule_once('create table random as select random() from generate_series(1,10000000) s');

系统可以并行运行多个任务,一旦激活,它会快速连续地运行任务,而无需产生新的进程开销,使其能够扩展到大量的任务。run_jobs 过程还会尝试每个任务最多 5 次,每次运行之间至少间隔 10 秒。永久性错误会被记录到 job_errors 表中。

提示:请记住,当使用这种模式时,你的 cron.job_run_details 表会快速填满。考虑在设置中禁用 cron.log_run 设置(以跳过 cron.job_run_details)和/或 cron.log_statement 设置(以跳过 PostgreSQL 日志)。

使用 pg_cron 在任务队列模式中的示例

有无数种方式可以使用这种任务队列模式。对于 Citus 数据库用户来说,一个有趣的示例可能是在使用 基于模式的分片 时管理大量的模式。例如,如果你想在许多模式中添加一个新列:

  
  
  
  1. --在所有分布式模式中的表中添加列:

  2. select schedule_once(format('alter table %I.data add column extra jsonb', schema_name))from citus_schemas;

通过这种方式执行 ALTER TABLE 操作,而不是遍历 Postgres 的模式,你可以避免运行一个持有激进锁定的长时间事务,并能有效地并行化工作。

用 pg_cron 愉快地安排任务吧!

希望这篇文章能为你提供其他想法,展示如何使用 pg_cron 来自动化你的 PostgreSQL 工作流。如果你想开始使用,pg_cron 的主要文档位于 pg_cron GitHub 仓库。


由 Marco Slot 撰写

前 Microsoft Citus 数据库引擎的主要工程师。曾在 Postgres Conf EU、PostgresOpen、pgDay Paris、Hello World、SIGMOD 以及许多聚会上发表演讲。Citus Con:一个针对 Postgres 的活动的演讲选择团队成员。分布式系统博士。热爱山地徒步。


本文分享自微信公众号 - 开源软件联盟PostgreSQL分会(kaiyuanlianmeng)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部