Joomla-开源天空



mysql jobqueue

E-mail

原文地址:http://forge.mysql.com/tools/tool.php?id=69

只支持mysql 5.1.6以上版本。详细代码如下:

--
-- MySQL JobQueue v 0.1
-- Authors Geoffroy Cogniaux
--
-- MySQL JobQueue is a nice, simple, job queue system for MySQL ( 5.1.6 or above )
-- MySQL JobQueue allows you to submit queries or stored procedures
-- and let them be executed asynchronously by MySQL.
-- MySQL JobQueue allows you to specify a priority to your job and
-- use a retry counter.
-- example : CALL jobq.qsubmit('MyJOb','insert into log values(\'dummy\')',3,5)
-- Explain : When the queue events will start the qrun procedure, the job
--           'MyJOb' will run 3 times max ( if failed ) at the priority 5.
-- Priority can be between 1 and 9. If many jobs are candidate to run, they will
-- run sorted from 1 to 9.
--
-- NOTE : see the events example at the end of this script to create your own
--        schedule interval for the running queue.
--
-- send comments or report bugs at : 这个 E-mail 地址已经被防止灌水恶意程式保护,您需要激活 Java Script 才能观看 with subject :
-- MySQL JobQueue - [comment|bug].
--


/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8 */;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;


--
-- Create schema jobq
--

CREATE DATABASE IF NOT EXISTS jobq;
USE jobq;

--
-- Definition of table `jobq`
--

DROP TABLE IF EXISTS `jobq`;
CREATE TABLE `jobq` (
  `jobid` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `jobname` varchar(45) NOT NULL,
  `jobcommand` text NOT NULL,
  `jobstatus` varchar(8) NOT NULL,
  `jobretrymax` int(10) unsigned NOT NULL,
  `jobretrycount` int(10) unsigned NOT NULL,
  `jobsubmiters` varchar(45) NOT NULL,
  `jobpriority` int(10) unsigned NOT NULL,
  `joblastaction` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`jobid`),
  KEY `STATUS_IDX` (`jobstatus`) USING HASH
) ENGINE=MyISAM DEFAULT CHARSET=utf8 COMMENT='job queue';


--
-- Definition of procedure `qpurge`
--

DROP PROCEDURE IF EXISTS `qpurge`;

DELIMITER $$

/*!50003 SET @TEMP_SQL_MODE=@@SQL_MODE, SQL_MODE='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER' */ $$
CREATE PROCEDURE `qpurge`()
BEGIN

DELETE FROM jobq
WHERE STATUS = 'SUCCESS';

END $$
/*!50003 SET SESSION SQL_MODE=@TEMP_SQL_MODE */  $$

DELIMITER ;

--
-- Definition of procedure `qrun`
--

DROP PROCEDURE IF EXISTS `qrun`;

DELIMITER $$

/*!50003 SET @TEMP_SQL_MODE=@@SQL_MODE, SQL_MODE='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER' */ $$
CREATE PROCEDURE `qrun`()
BEGIN

DECLARE done INT DEFAULT 0;
DECLARE err INT DEFAULT 0;
DECLARE jobid_v INT;
DECLARE jobname_v VARCHAR(45);
DECLARE jobcommand_v TEXT;
DECLARE jobretrymax_v INT;
DECLARE jobretrycount_v INT;

DECLARE curjob_c CURSOR FOR
  SELECT jobid,jobname,jobcommand,jobretrymax,jobretrycount
  FROM jobq
  WHERE jobstatus='WAITING' and jobretrycount < jobretrymax
  ORDER BY jobpriority desc;

DECLARE CONTINUE HANDLER FOR SQLSTATE '02000' SET done = 1;
DECLARE CONTINUE HANDLER FOR SQLEXCEPTION SET err= 1;


OPEN curjob_c;

  FETCH curjob_c INTO jobid_v,jobname_v,jobcommand_v,jobretrymax_v,jobretrycount_v;
  WHILE done = 0
  DO
    UPDATE jobq
    SET jobstatus= 'RUNNING'
    WHERE jobid = jobid_v;

    SET @jobcommand= jobcommand_v;
    PREPARE my_cmd FROM @jobcommand;
    EXECUTE my_cmd;
    DEALLOCATE PREPARE my_cmd;

    IF err = 1 THEN
      BEGIN
        UPDATE jobq
        SET jobretrycount= jobretrycount_v+ 1,
        jobstatus= 'WAITING'
        WHERE jobid = jobid_v;

        UPDATE jobq
        SET jobstatus= 'ERROR'
        WHERE jobretrycount >= jobretrymax;
      END;
    ELSE
      BEGIN
        UPDATE jobq
        SET jobstatus= 'SUCCESS'
        WHERE jobid = jobid_v;
      END;
    END IF;

    SET err= 0;
    FETCH curjob_c INTO jobid_v,jobname_v,jobcommand_v,jobretrymax_v,jobretrycount_v;
  END WHILE;

CLOSE curjob_c;


END $$
/*!50003 SET SESSION SQL_MODE=@TEMP_SQL_MODE */  $$

DELIMITER ;

--
-- Definition of procedure `qsubmit`
--

DROP PROCEDURE IF EXISTS `qsubmit`;

DELIMITER $$

/*!50003 SET @TEMP_SQL_MODE=@@SQL_MODE, SQL_MODE='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER' */ $$
CREATE PROCEDURE `qsubmit`(
  IN jobname_p VARCHAR(45),
  IN jobcommand_p TEXT,
  IN jobretry_p INT,
  IN jobpriority_p INT
)
BEGIN

DECLARE submiters_v VARCHAR(45);
DECLARE jobpriority_v INT DEFAULT 5;

IF jobpriority_p > 9 OR jobpriority_p < 0 THEN
  SET jobpriority_v= 9;
END IF;

SELECT user() INTO submiters_v;

INSERT INTO jobq
(
  jobname,
  jobcommand,
  jobstatus,
  jobretrymax,
  jobretrycount,
  jobsubmiters,
  jobpriority
)
VALUES
(
  jobname_p,
  jobcommand_p,
  'WAITING',
  jobretry_p,
  0,
  submiters_v,
  jobpriority_v
);

END $$
/*!50003 SET SESSION SQL_MODE=@TEMP_SQL_MODE */  $$

DELIMITER ;

-- CREATE THE SCHEDULER WHICH WILL ALLOW QUEUE RUNNING
-- SET GLOBAL event_scheduler = ON;

CREATE EVENT mysql_jobqueue
ON SCHEDULE  EVERY 5 MINUTE
COMMENT 'MySQL JobQueue runner.'
DO CALL jobq.qrun();


/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;

 

发表您的文章评论

您的姓名 (昵称)
标题:
评分: 很差一般较好很好
评论:
验证码:
请输入验证码

» » 登录 »   -   -