Canal订阅MySQL的binlog的php实现

canal 工作原理

主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

1.canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2.MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
3.canal 解析 binary log 对象(原始为 byte 流)

实现流程

代码演示

<?php
require_once dirname(__FILE__).'/vendor/autoload.php';
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\CanalConnectorFactory;
use Com\Alibaba\Otter\Canal\Protocol\Column;
use Com\Alibaba\Otter\Canal\Protocol\Entry;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\RowData;
ini_set('display_errors', 'On');
error_reporting(E_ALL);
try {
    $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
    # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);
    //链接到Canal Server
    $client->connect("127.0.0.1", 11111);
    $client->checkValid();
    $client->subscribe("1001", "example", ".*\\..*");
    # $client->subscribe("1001", "example", "db_name.tb_name"); # 设置过滤
    while (true) {
        $message = $client->get(100);
        if ($entries = $message->getEntries()) {
            foreach ($entries as $entry) {
                //获取header信息
                $header = $entry->getHeader();
                //获取binlog文件
                $binLogFileName = $header->getLogfileName();
                //获取binlog文件偏移量
                $binLogFileOffset = $header->getLogfileOffset();
                //获取数据库scheme
                $schemeName = $header->getSchemaName();
                //获取表名
                $tableName = $header->getTableName();
                //获取操作事件
                $eventType = $header->getEventType();
                //获取EntryType
                $entryType = $entry->getEntryType();
                //只处理EntryType为ROWDATA的数据
                if($entryType != EntryType::ROWDATA)
                {
                    continue;
                }
                //获取StoreValue
                $storeValue = $entry->getStoreValue();
                //将StoreValue解析为RowChange
                $rowChange = new RowChange();
                $rowChange->mergeFromString($storeValue);
                //获取RowChange的EventType
                $eventType = $rowChange->getEventType();
                //只处理delete/insert/update操作
                if(!in_array($eventType,[EventType::DELETE,EventType::INSERT,EventType::UPDATE]))
                {
                    continue;
                }
                //获取RowChange的对应的sql操作
                $sql = $rowChange->getSql();
                echo "操作对象:{$schemeName}.{$tableName},SQL事件:{$eventType},执行SQL:{$sql}".PHP_EOL;
                //根据RowChange,获取所有数据的变更信息
                $rowChangeDatas = $rowChange->getRowDatas();
                //遍历RowChange,获取每一条数据的变更信息
                foreach($rowChangeDatas as $rowData)
                {
                    $beforeColumns = $afterColumns = [];
                    //根据EventType即sql操作类型,分别获取变化钱和变化后的值
                    switch($eventType)
                    {
                        //delete操作,只有变化前的值没有变化后的值
                        case EventType::DELETE:
                            $beforeColumns = $rowData->getBeforeColumns();
                            break;
                        //insert操作,没有变化前的值只有变化后的值
                        case EventType::INSERT:
                            $afterColumns = $rowData->getAfterColumns();
                            break;
                        //update操作,有变化前的值也有变化后的值
                        case EventType::UPDATE:
                            $beforeColumns = $rowData->getBeforeColumns();
                            $afterColumns = $rowData->getAfterColumns();
                            break;
                        //除了insert/delete/update操作,其他暂不处理
                        default:
                        break; 
                    }
                    if(empty($beforeColumns) && empty($afterColumns))
                    {
                        continue;
                    }
                    if(!empty($beforeColumns))
                    {
                        $beforeString = "【修改前】";
                        foreach($beforeColumns as $column)
                        {
                            $columnName = $column->getName();
                            $columnValue = $column->getValue();
                            $isUpdate = $column->getUpdated() ? 1 : 0;
                            $beforeString .= " {$columnName}:{$columnValue} ";
                        }
                        echo $beforeString .= PHP_EOL;
                    }
                    if(!empty($afterColumns))
                    {
                        $afterString = "【修改后】";
                        foreach($afterColumns as $column)
                        {
                            $columnName = $column->getName();
                            $columnValue = $column->getValue();
                            $isUpdate = $column->getUpdated() ? 1 : 0;
                            $afterString .= " {$columnName}:{$columnValue} ";
                        }
                        echo $afterString .= PHP_EOL;
                    }
                }
            }
	    sleep(1);
        } else {
            sleep(5);
        }
    }
    $client->disConnect();
} catch (\Exception $e) {
    echo $e->getMessage(), PHP_EOL;
}
类型 说明
eventType 1 新增(insert)
eventType 2 更新(update)
eventType 3 删除(delete)

效果展示

原始表数据
Canal订阅MySQL的binlog的php实现

修改数据

update canal.user set age= 28 where id= 5;

订阅结果
Canal订阅MySQL的binlog的php实现