首页 > 代码库 > 数据库中间件Mycat源码解析(三):Mycat的SQL解析和路由

数据库中间件Mycat源码解析(三):Mycat的SQL解析和路由

mycat对sql的解析分为两部分,一个是普通sql,另一个是PreparedStatment。

下面以解析普通sql为例分析(另一种方式大同小异),sql从客户端发过来后server接收后会调用FrontendCommandHandler的handle方法,这个方法会调用FrontendConnection的query方法,接着query方法会调用ServerQueryHandler的query方法,接着调用ServerConnection的execute方法。如下图所示:

public void execute(String sql, int type) {
		//连接状态检查
		if (this.isClosed()) {
			LOGGER.warn("ignore execute ,server connection is closed " + this);
			return;
		}
		// 事务状态检查
		if (txInterrupted) {
			writeErrMessage(ErrorCode.ER_YES,
					"Transaction error, need to rollback." + txInterrputMsg);
			return;
		}

		// 检查当前使用的DB
		String db = this.schema;
		boolean isDefault = true;
		if (db == null) {
			db = SchemaUtil.detectDefaultDb(sql, type);
			if (db == null) {
				writeErrMessage(ErrorCode.ERR_BAD_LOGICDB, "No MyCAT Database selected");
				return;
			}
			isDefault = false;
		}
		
		// 兼容PhpAdmin's, 支持对MySQL元数据的模拟返回
		//// TODO: 2016/5/20 支持更多information_schema特性
		if (ServerParse.SELECT == type 
				&& db.equalsIgnoreCase("information_schema") ) {
			MysqlInformationSchemaHandler.handle(sql, this);
			return;
		}

		if (ServerParse.SELECT == type 
				&& sql.contains("mysql") 
				&& sql.contains("proc")) {
			
			SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql);
			if (schemaInfo != null 
					&& "mysql".equalsIgnoreCase(schemaInfo.schema)
					&& "proc".equalsIgnoreCase(schemaInfo.table)) {
				
				// 兼容MySQLWorkbench
				MysqlProcHandler.handle(sql, this);
				return;
			}
		}
		
		SchemaConfig schema = MycatServer.getInstance().getConfig().getSchemas().get(db);
		if (schema == null) {
			writeErrMessage(ErrorCode.ERR_BAD_LOGICDB,
					"Unknown MyCAT Database '" + db + "'");
			return;
		}

		//fix navicat   SELECT STATE AS `State`, ROUND(SUM(DURATION),7) AS `Duration`, CONCAT(ROUND(SUM(DURATION)/*100,3), '%') AS `Percentage` FROM INFORMATION_SCHEMA.PROFILING WHERE QUERY_ID= GROUP BY STATE ORDER BY SEQ
		if(ServerParse.SELECT == type &&sql.contains(" INFORMATION_SCHEMA.PROFILING ")&&sql.contains("CONCAT(ROUND(SUM(DURATION)/*100,3)"))
		{
			InformationSchemaProfiling.response(this);
			return;
		}
		
		/* 当已经设置默认schema时,可以通过在sql中指定其它schema的方式执行
		 * 相关sql,已经在mysql客户端中验证。
		 * 所以在此处增加关于sql中指定Schema方式的支持。
		 */
		if (isDefault && schema.isCheckSQLSchema() && isNormalSql(type)) {
			SchemaUtil.SchemaInfo schemaInfo = SchemaUtil.parseSchema(sql);
			if (schemaInfo != null && schemaInfo.schema != null && !schemaInfo.schema.equals(db)) {
				SchemaConfig schemaConfig = MycatServer.getInstance().getConfig().getSchemas().get(schemaInfo.schema);
				if (schemaConfig != null)
					schema = schemaConfig;
			}
		}

		routeEndExecuteSQL(sql, type, schema);

	}
最后有个routeEndExecuteSQL方法,它会首先调用RouteService的route方法先进行路由,然后调用HintSQLHandler的route方法,这个方法里调用RouteStrategy的route方法,这里使用了一个策略模式,包含下面几种sql类型,不同类型使用不同策略。
public final class ServerParse {

	public static final int OTHER = -1;
	public static final int BEGIN = 1;
	public static final int COMMIT = 2;
	public static final int DELETE = 3;
	public static final int INSERT = 4;
	public static final int REPLACE = 5;
	public static final int ROLLBACK = 6;
	public static final int SELECT = 7;
	public static final int SET = 8;
	public static final int SHOW = 9;
	public static final int START = 10;
	public static final int UPDATE = 11;
	public static final int KILL = 12;
	public static final int SAVEPOINT = 13;
	public static final int USE = 14;
	public static final int EXPLAIN = 15;
	public static final int EXPLAIN2 = 151;
	public static final int KILL_QUERY = 16;
	public static final int HELP = 17;
	public static final int MYSQL_CMD_COMMENT = 18;
	public static final int MYSQL_COMMENT = 19;
	public static final int CALL = 20;
	public static final int DESCRIBE = 21;
    public static final int LOAD_DATA_INFILE_SQL = 99;
    public static final int DDL = 100;

使用不同的路由方法是在routeNormalSqlWithAST中决定的,

public RouteResultset routeNormalSqlWithAST(SchemaConfig schema,
			String stmt, RouteResultset rrs, String charset,
			LayerCachePool cachePool) throws SQLNonTransientException {
		
		/**
		 *  只有mysql时只支持mysql语法
		 */
		SQLStatementParser parser = null;
		if (schema.isNeedSupportMultiDBType()) {
			parser = new MycatStatementParser(stmt);
		} else {
			parser = new MySqlStatementParser(stmt); 
		}

		MycatSchemaStatVisitor visitor = null;
		SQLStatement statement;
		
		/**
		 * 解析出现问题统一抛SQL语法错误
		 */
		try {
			statement = parser.parseStatement();
            visitor = new MycatSchemaStatVisitor();
		} catch (Exception t) {
	        LOGGER.error("DruidMycatRouteStrategyError", t);
			throw new SQLSyntaxErrorException(t);
		}

		/**
		 * 检验unsupported statement
		 */
		checkUnSupportedStatement(statement);


		DruidParser druidParser = DruidParserFactory.create(schema, statement, visitor);
		druidParser.parser(schema, rrs, statement, stmt,cachePool,visitor);

		/**
		 * DruidParser 解析过程中已完成了路由的直接返回
		 */
		if ( rrs.isFinishedRoute() ) {
			return rrs;
		}
		
		/**
		 * 没有from的select语句或其他
		 */
        DruidShardingParseInfo ctx=  druidParser.getCtx() ;
        if((ctx.getTables() == null || ctx.getTables().size() == 0)&&(ctx.getTableAliasMap()==null||ctx.getTableAliasMap().isEmpty()))
        {
		    return RouterUtil.routeToSingleNode(rrs, schema.getRandomDataNode(), druidParser.getCtx().getSql());
		}

		if(druidParser.getCtx().getRouteCalculateUnits().size() == 0) {
			RouteCalculateUnit routeCalculateUnit = new RouteCalculateUnit();
			druidParser.getCtx().addRouteCalculateUnit(routeCalculateUnit);
		}
		
		SortedSet<RouteResultsetNode> nodeSet = new TreeSet<RouteResultsetNode>();
		for(RouteCalculateUnit unit: druidParser.getCtx().getRouteCalculateUnits()) {
			RouteResultset rrsTmp = RouterUtil.tryRouteForTables(schema, druidParser.getCtx(), unit, rrs, isSelect(statement), cachePool);
			if(rrsTmp != null) {
				for(RouteResultsetNode node :rrsTmp.getNodes()) {
					nodeSet.add(node);
				}
			}
		}
		
		RouteResultsetNode[] nodes = new RouteResultsetNode[nodeSet.size()];
		int i = 0;
		for (Iterator<RouteResultsetNode> iterator = nodeSet.iterator(); iterator.hasNext();) {
			nodes[i] = iterator.next();
			i++;
		}		
		rrs.setNodes(nodes);		
		
		//分表
		/**
		 *  subTables="t_order$1-2,t_order3"
		 *目前分表 1.6 开始支持 幵丏 dataNode 在分表条件下只能配置一个,分表条件下不支持join。
		 */
		if(rrs.isDistTable()){
			return this.routeDisTable(statement,rrs);
		}
		
		return rrs;
	}
它使用druid做数据库连接池,支持分库分表,下面我们以多个表的分库分表路由策略为例子进行分析。

public static void findRouteWithcConditionsForTables(SchemaConfig schema, RouteResultset rrs,
			Map<String, Map<String, Set<ColumnRoutePair>>> tablesAndConditions,
			Map<String, Set<String>> tablesRouteMap, String sql, LayerCachePool cachePool, boolean isSelect)
			throws SQLNonTransientException {
		
		//为分库表找路由
		for(Map.Entry<String, Map<String, Set<ColumnRoutePair>>> entry : tablesAndConditions.entrySet()) {
			String tableName = entry.getKey().toUpperCase();
			TableConfig tableConfig = schema.getTables().get(tableName);
			if(tableConfig == null) {
				String msg = "can't find table define in schema "
						+ tableName + " schema:" + schema.getName();
				LOGGER.warn(msg);
				throw new SQLNonTransientException(msg);
			}
			if(tableConfig.getDistTables()!=null && tableConfig.getDistTables().size()>0){
				routeToDistTableNode(tableName,schema,rrs,sql, tablesAndConditions, cachePool,isSelect);
			}
			//全局表或者不分库的表略过(全局表后面再计算)
			if(tableConfig.isGlobalTable() || schema.getTables().get(tableName).getDataNodes().size() == 1) {
				continue;
			} else {//非全局表:分库表、childTable、其他
				Map<String, Set<ColumnRoutePair>> columnsMap = entry.getValue();
				String joinKey = tableConfig.getJoinKey();
				String partionCol = tableConfig.getPartitionColumn();
				String primaryKey = tableConfig.getPrimaryKey();
				boolean isFoundPartitionValue = http://www.mamicode.com/partionCol != null && entry.getValue().get(partionCol) != null;> mycat会先找主键(支持多主键),根据主键去找不同的node节点,然后在不同的node分别执行sql,这样它就获取了sql的路由表,所谓的路由表就是查找表存在于哪些节点中。这个如果是在依据主键分库分表(同时存在多种分片类型如下图所示)的情况下主要通过分析sql中的存在的表名和主键的键值在schema配置中通过算法(RuleAlgorithm)查找的(如果没有主键范围就路由到所有节点),找到节点后,才具体去执行sql。

PartitionByDate
PartitionByFileMap
PartitionByHashMod
PartitionByHotDate
PartitionByJumpConsistentHash
PartitionByLong
PartitionByMod
PartitionByMonth
PartitionByMurmurHash
PartitionByPattern
PartitionByPrefixPattern
PartitionByRangeDateHash
PartitionByRangeMod
PartitionByString
PartitionDirectBySubString

在上面提到的routeEndExecuteSQL方法中找到路由节点后它会调用NonBlockingSession的execute方法,它分为单节点模式和多节点模式,下面以多节点模式为例,在这种情况下它会调用MultiNodeQueryHandler的execute方法。

public void execute() throws Exception {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			this.reset(rrs.getNodes().length);
			this.fieldsReturned = false;
			this.affectedRows = 0L;
			this.insertId = 0L;
		} finally {
			lock.unlock();
		}
		MycatConfig conf = MycatServer.getInstance().getConfig();
		startTime = System.currentTimeMillis();
		LOGGER.debug("rrs.getRunOnSlave()-" + rrs.getRunOnSlave());
		for (final RouteResultsetNode node : rrs.getNodes()) {
			BackendConnection conn = session.getTarget(node);
			if (session.tryExistsCon(conn, node)) {
				LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave());
				node.setRunOnSlave(rrs.getRunOnSlave());	// 实现 master/slave注解
				LOGGER.debug("node.getRunOnSlave()-" + node.getRunOnSlave());
				_execute(conn, node);
			} else {
				// create new connection
				LOGGER.debug("node.getRunOnSlave()1-" + node.getRunOnSlave());
				node.setRunOnSlave(rrs.getRunOnSlave());	// 实现 master/slave注解
				LOGGER.debug("node.getRunOnSlave()2-" + node.getRunOnSlave());
				PhysicalDBNode dn = conf.getDataNodes().get(node.getName());
				dn.getConnection(dn.getDatabase(), autocommit, node, this, node);
				// 注意该方法不仅仅是获取连接,获取新连接成功之后,会通过层层回调,最后回调到本类 的connectionAcquired
				// 这是通过 上面方法的 this 参数的层层传递完成的。
				// connectionAcquired 进行执行操作:
				// session.bindConnection(node, conn);
				// _execute(conn, node);
			}

		}
	}

到此优化后的sql被发给路由结果中的各个节点执行。



数据库中间件Mycat源码解析(三):Mycat的SQL解析和路由