首页 » Ecstore » 详解Ecstore计划任务的代码逻辑

详解Ecstore计划任务的代码逻辑

10185 2

Ecos的框架是建立在linux的基础上的,在windows上是会缺胳膊少腿的,其中计划任务用的是linux的crontab,之前网上见过不少人碰到ecstore的计划任务执行报错的案例,也给出了一定的解决方法,这其中有些是误打误撞。tiandi认为,一段代码没有执行或者报错,必须得从它的源头查起。

让我们来看看ecos的计划任务到底是怎么运行的:

1. 执行/app/system/controller/admin/crontab.php中的exec方法,执行base_contab_schedule::trigger_once($cron[‘id’])。

function exec($cron_id) {
        $this->begin('index.php?app=system&ctl=admin_crontab&act=index');
        $model = app::get('base')->model('crontab');
        $cron = $model->getRow('id', array('id'=>$cron_id));
        if(!$cron || (base_crontab_schedule::trigger_one($cron['id'])===false)) {
            $this->end(false, '执行失败');
        }
        $this->end(true, '执行成功');
    }

2. /base/lib/crontab/schedule.php的trigger_once方法,$cron_id为任务类名。执行system_queue::instance()->publish(‘crontab:’.$worker, $worker)后,会打印日志在/data/crontab/下,并且将crontab表该任务的last字段做时间更新。

static public function trigger_one($cron_id){
        if ($cron = app::get('base')->model('crontab')->getRow('id, last', array('id' => $cron_id, 'enabled => true'))){
            $now = time();            
            if (($now - $cron['last'])<60) {
                trigger_error(app::get('base')->_('1分钟之内不能重复执行'), E_USER_ERROR);
            }

            //add_task
            $worker = $cron['id'];
            system_queue::instance()->publish('crontab:'.$worker, $worker);
            self::__log($cron_id, $now, 'add queue ok');
            app::get('base')->model('crontab')->update(array('last'=>$now), array('id' => $cron_id));
        }
    }

3. /app/system/lib/queue.php中的publish方法,$queues的值为array(‘slow’),array(‘quick’),array(‘nor mal’)中的一个,然后请求控制器的publish。

public function publish($exchange_name, $worker, $params=array(), $routing_key=null){
        $queues = $this->__get_publish_queues($exchange_name);
        foreach($queues as $queue_name){
            $queue_data = array(
                'queue_name' => $queue_name,
                'worker' => $worker,
                'params' => $params);

            $this->get_controller()->publish($queue_name, $queue_data);
        }
        return true;
    }

4. /app/system/lib/queue/adapter/mysql.php的publish方法,向system_queue_mysql表插入一条记录,默认该记录own_thread_id为-1,在dbschema表中定义。

public function publish($queue_name,$queue_data){
        $time = time();
        $data = array('queue_name' => $queue_data['queue_name'],
                      'worker' => $queue_data['worker'],
                      'params' => serialize((array)$queue_data['params']),
                      'create_time' => $time);

        return $this->__model->insert($data);
    }

5. 到此点击时间处理完毕,并不会真正的执行计划任务。执行还是靠的linux的cron发起的php请求。linux下设置了两条cron,分别是
* * * * * /srv/www/www.suanjuzi.com/script/queue/queue.sh /opt/php/bin/php >/dev
* * * * * /opt/php/bin/php /srv/www/www.suanjuzi.com/script/crontab/crontab.php

6. 先看上面的第2条/script/crontab/crontab.php,实际上这个文件就主要做了一件事,调用base_crontab_schedule::trigger_all(),和后台点执行调用的trigger_one的代码逻辑差不多,只是这里是触发了所有的该执行的ecstore后台计划任务。

7. 回过来看上面第1条命令,script/queue/queue.sh是执行命令,后面是参数。
打开shell文件,可以看到queuelist是从/config/queuelist.php这个默认定义的文件返回,返回的值为slow,quick,normal,然后通过checkprocess查看当前linux进程是否有存在/script/queue.php slow[quick][normal],在的话显示active,不在的话,请求该php文件。之前计划任务不跑的原因就在这里,作为我们后面加的计划任务,都是属于normal,结果这里进程一直存在,所以就不会再继续再执行normal的计划任务,只会执行原先默认的quick和slow的计划任务。

8. /script/queue.php执行了下面代码

$queue_name = $argv[1];
$queues = system_queue::instance()->get_config('queues');
if ($num = (int)$queues[$queue_name]['thread']) {
    system_queue_consumer::instance('proc')->exec($queue_name, $num);
}

9. /app/system/lib/queue/consumer/proc.php中的exec方法,当前线程小于可允许最大值(/config/queuelist.php里定义)时,并且queue_mysql表的own_thread_id为-1时,通过new system_queue_consumer_proc_thread来创建新的线程,

while ($this->threadRunning < $max && !system_queue::instance()->is_end($queue_name)) {
      $this->running[] = new system_queue_consumer_proc_thread($queue_name,$phpExec);
      usleep(200000);
      $this->threadRunning++;
}

10. /app/system/lib/queue/consumer/proc/thread.php,通过proc_open建立新线程,超过2次报错,但是在出问题normal卡住的时候这边也没报错,所以应该跟这个throw无关。

while (($this->resource = proc_open($executable." ".$script." ".$queue_name, $descriptorspec, $this->pipes, NULL, $_ENV))===null) {
     $i++;
     if ($i>2) {
          throw new Exception(' cannot create new proccess for consume queue.', 30001);
     }
}

11.proc_open的第一个参数是命令行,调用PHP执行 /script/queue/queuescript.php $queue_name,

if($queue_message = system_queue::instance()->get($queue_name)){
    system_queue::instance()->run_task($queue_message);
    system_queue::instance()->ack($queue_message);
}

12./app/system/lib/queue.php里的run_task和ack方法。Run_task最终调用的自定义计划任务类的exec方法,而ack则最后删除了数据表里的相应记录。

public function run_task($queue_message){
        //todo: 异常处理
        $worker = $queue_message->get_worker();
        $params = $queue_message->get_params();

        $obj_task = new $worker();
        if ($obj_task instanceof base_interface_task) {
            call_user_func_array(array($obj_task, 'exec'), array($params));
            logger::info('task:'. get_class($obj_task). ' exec ok');
        }
        return true;
    }
public function ack($queue_message){
        $queue_id = $queue_message->get_id();
        return $this->__model->delete(array('id'=>$queue_id));
    }
文章评分4次,平均分5.0

本文原始地址:https://www.tiandiyoyo.com/2016/08/what_does_ecstore_do_for_cron/
本站所有文章,除了特别注明外,均为本站原创,转载请注明出处来自www.tiandiyoyo.com

您可能还会对以下文章感兴趣:

评论前先开启评论开关:


2 Comments

  1. 郑永 :

    沙发,好久不见!最近发的都是代码,看不懂哈。

载入分页评论...