Victor's Code Journey
Victor's Code Journey

目录

深入理解 Apache Calcite-Catalog

警告
本文最后更新于 2022-01-07,文中内容可能已过时。

Calcite 支持管理底层数据库的元数据信息和根据元数据优化查询 SQL。但是 Calcite 不直接存储底层系统中的元数据信息,用户需要先将元数据注册到 Calcite Catalog 中,才可以使用Calcite提供的能力。

在 Caclite 中,Catalog 主要用来定义 SQL 查询过程中所需要的元数据和命名空间,主要包含以下内容:

  • Schema: 表和函数的命名空间。也支持任意级别的嵌套子schema。
  • Table: 代表一个数据集,其中所有字段的类型由RelDataType定义。
  • RelDataType: 代表一个数据集中的所有字段。简单的说,描述了表中一行数据的类型。
  • Function: 代表注册在 Schema 中的一个函数,接收参数并返回类型。
  • Lattice: 晶格,用于定义多维数据模型和自动化管理物化视图(Materialized View)的核心概念。

Catalog的基础实现是抽象类 CalciteSchema。CalciteSchema 有 CachingCalciteSchema 和 SimpleCalciteSchema 两个子类,他们的区别主要是是否查找、函数和类型。

classDiagram
    class CalciteSchema {
        - CalciteSchema parent
        + Schema schema
        + Sring name
        # NameMap~CalciteSchema~ subSchemaMap
        + add(String name, Schema schema) CalciteSchema
        + add(String tableName, Table table) TableEntry
        + add(String name, RelProtoDataType type) TypeEntry
        - add(String name, Function function) FunctionEntry
        + getSubSchema(String schemaName, boolean caseSensitive) CalciteSchema
        + getTable(String tableName, boolean caseSensitive) TableEntry
        + getType(String name, boolean caseSensitive) TypeEntry
        + getFunctions(String name, boolean caseSensitive) Collection~Function~
        + isCacheEnabled() boolean
        + setCache(boolean cache)
    }
    CalciteSchema o--	Schema
    <<Abstract>> CalciteSchema
    class CachingCalciteSchema
    CalciteSchema <|--CachingCalciteSchema
    note for CachingCalciteSchema "支持查询缓存的Catalog 实现"
    class SimpleCalciteSchema
    CalciteSchema <|--SimpleCalciteSchema
    note for SimpleCalciteSchema "简单的Catalog 实现"

CalciteSchema 实际上是 Schema的 Wrapper,两者间是 1:1 的关系。由于 Schema 支持嵌套,CalciteSchema也支持了嵌套关系:

  • Parent 字段就指向了父节点
  • schema 字段是内部的 Schema

entry

CalciteSchema 使用下面的字段存储 name 和 entry(table,type,function,lattice) 的映射关系,支持通过 add*()方法添加 ,get*()查找和remove*() 删除。

注意, 上述的修改方法不会影响内部的 Schema。

public abstract class CalciteSchema {
  ... ... 
  /** Tables explicitly defined in this schema. Does not include tables in
   * {@link #schema}. */
  protected final NameMap<TableEntry> tableMap;
  private final LazyReference<Lookup<TableEntry>> tables = new LazyReference<>();
  protected final NameMultimap<FunctionEntry> functionMap;
  protected final NameMap<TypeEntry> typeMap;
  protected final NameMap<LatticeEntry> latticeMap;
  protected final NameSet functionNames;
  protected final NameMap<FunctionEntry> nullaryFunctionMap;
  ... ...
  public abstract static class Entry {
    public final CalciteSchema schema;
    public final String name;

    protected Entry(CalciteSchema schema, String name) {
      this.schema = requireNonNull(schema, "schema");
      this.name = requireNonNull(name, "name");
    }

    /** Returns this object's path. For example ["hr", "emps"]. */
    public final List<String> path() {
      return schema.path(name);
    }
  }
}
  • NameMap<V>内部有一个NavigableMap<String,V>(继承自SortedMap),支持返回子视图,支持范围查询。
  • NameMultimap<V>内部是一个NameMap<List<V>>
  • NameSet内部是一个 NameMap<Object>,其中 value 都是默认值Object DUMMY = new Object();

Entry的子类有4类

classDiagram
  class Entry{
    + CalciteSchema schema
    + String name
    + path() List~String~ 
  }
  class TableEntry{
    + getTable() Table
  }
  <<Abstract>> TableEntry
  class TableEntryImpl
  TableEntry <|.. TableEntryImpl
  Entry <|-- TableEntry

  class TypeEntry{
    + getType() RelProtoDataType
  }
  <<Abstract>> TypeEntry
  class TypeEntryImpl
  TypeEntry <|.. TypeEntryImpl
  Entry <|-- TypeEntry

  class FunctionEntry{
    + getFunction() Function
    + isMaterialization() boolean
  }
  <<Abstract>> FunctionEntry
  class FunctionEntryImpl
  FunctionEntry <|.. FunctionEntryImpl
  Entry <|-- FunctionEntry

  class LatticeEntry{
    + getLattice() Lattice
    + getStartTable() TableEntry
  }
  <<Abstract>> LatticeEntry
  class LatticeEntryImpl
  LatticeEntry <|.. LatticeEntryImpl
  Entry <|-- LatticeEntry

Lookup 代表 Catalog 中的一次(大小写敏感/不敏感)查询。

public interface Lookup<T> {
  // 根据名称查找值
  @Nullable T get(String name);
  // 根据名称查找值(忽略大小写)
  @Nullable Named<T> getIgnoreCase(String name);
  // 查找符合模式的名字(大小写敏感)
  // LikePattern sql like 风格。
  Set<String> getNames(LikePattern pattern);
  // =============== 工厂方法 ===============
  // 空 Lookup
  static <T> Lookup<T> empty() {
    return (Lookup<T>) EmptyLookup.INSTANCE;
  }
  // 基于 NameMap的 LookUp
  static <T> Lookup<T> of(NameMap<T> map) {
    return new NameMapLookup<>(map);
  }
  // 多个 Lookup 拼接
  static <T> Lookup<T> concat(Lookup<T>... lookups) {
    return new ConcatLookup<>(lookups);
  }

Lookup 有以下实现:

classDiagram
  class Lookup~T~
  <<Interface>> Lookup
  class IgnoreCaseLookup~T~
  <<Abstract>> IgnoreCaseLookup
  Lookup <|.. IgnoreCaseLookup
  class CompatibilityLookup~T~
  IgnoreCaseLookup <|-- CompatibilityLookup
  class ConcatLookup~T~
  Lookup <|.. ConcatLookup
  class EmptyLookup~T~
  Lookup <|.. EmptyLookup
  class LoadingCacheLookup~T~{
    - Lookup~T~ delegate
    - LoadingCache~String, T~ cache
    - LoadingCache~String, Named~ cacheIgnoreCase
  }
  Lookup <|.. LoadingCacheLookup
  class NameMapLookup~T~
  Lookup <|.. NameMapLookup
  class SnapshotLookup~T~{
   - Lookup~T~ delegate
   - LazyReference~Lookup~ cachedDelegate
   - boolean enabled
  }
  Lookup <|.. SnapshotLookup
  class TransformingLookup~S,T~{
    - Lookup~S~ delegate
    - BiFunction~S, String, T~ transform
  }
  • IgnoreCaseLookup: 是忽略大小写的 Lookup
    • CompatibilityLookup: 支持外部注入的 Lookup 逻辑
  • EmptyLookup: 空 Lookup
  • ConcatLookup: 多个 Lookup 结果合并(数组顺序是优先级)
  • LoadingCacheLookup: 支持缓存的 Lookup
  • NameMapLookup: 基于 NameMap的 LookUp
  • SnapshotLookup: 存储 Lookup 结果的查询快照
  • TransformingLookup: 支持结果变换的 Lookup。

schema是表和函数的命名空间。也支持任意级别的嵌套子schema。大多数数据库的实现是有层级限制的,例如,大部分的 JDBC 仅支持:

  • 一级 Schema
  • 二级 database+catalog
public interface Schema {
  /*根据表名返回对应表,或null*/
  @Nullable Table getTable(String name);
  /*返回所有表名*/
  Set<String> getTableNames();
  /*根据表名返回对应的类型,或null*/
  @Nullable RelProtoDataType getType(String name);
  /*返回所有表的行类型名*/
  Set<String> getTypeNames();
  /*返回函数名对应的所有函数,或空集*/
  Collection<Function> getFunctions(String name);
  /*返回所有函数名*/
  Set<String> getFunctionNames();
  /*返回名称对应的子schema*/
  @Nullable Schema getSubSchema(String name);
  /*返回所有子schema的名称*/
  Set<String> getSubSchemaNames();
  /* 返回schema 在代码中调用的表达式 */
  Expression getExpression(SchemaPlus parentSchema, String name);
  /**
   * 这个schema是否可以创建新的表,函数和子Schema,
   * 注意,即使这个方法返回了true,schema 内部的map(存储)也不会被修改,
   * Calcite 将定义的对象存储在包装类里面
   */
  boolean isMutable();
  /*返回当前schema的快照*/
  Schema snapshot(SchemaVersion version);
}

下图是 calcite-core 包中的Schema类图。

classDiagram
  class Schema
  <<Interface>> Schema
  class AbstractSchema
  Schema <|.. AbstractSchema
  class ReflectiveSchema
  AbstractSchema <|-- ReflectiveSchema
  class MetadataSchema
  AbstractSchema <|-- MetadataSchema
  class CloneSchema
  AbstractSchema <|-- CloneSchema
  class JdbcBaseSchema
  <<Abstract>> JdbcBaseSchema
  Schema <|.. JdbcBaseSchema
  class JdbcCatalogSchema
  JdbcBaseSchema <|-- JdbcCatalogSchema
  class JdbcSchema
  JdbcBaseSchema <|-- JdbcSchema
  class DelegatingSchema
  Schema <|.. DelegatingSchema
  class SchemaPlus
  <<Interface>> SchemaPlus
  class SchemaPlusImpl
  Schema <|.. SchemaPlusImpl
  class MySchemaPlus
  Schema <|.. MySchemaPlus
  class SemiMutableSchema
  <<Interface>> SemiMutableSchema
  • AbstractSchema:
    • ReflectiveSchema: 通过反射对象获取Java类的schema.
    • MetadataSchema: 适配avatica Meta的schema
    • CloneSchema: 支持在内存中从JDBC schema拷贝tables.
  • JdbcBaseSchema: 从JDBC数据源中获取Schema信息。
    • JdbcSchema: 由JDBC数据源支持的Schema的实现。
    • JdbcCatalogSchema: 基于JDBC catalog的 Schema 实现。
  • DelegatingSchema: 代理Schema,简单的包装器。
  • SchemaPlus: Schema接口的扩展。是Calcite包装schema实现的包装器。
    • SchemaPlusImpl: SchemaPlus接口的实现类。将CalciteSchema(wrapper)适配到SchemaPlus接口。
    • MySchemaPlus: 执行计划中使用。
  • SemiMutableSchema: 支持新增物化视图的schema(在calcite 中未被使用)。

AbstractSchema 对 Schema 接口的方法进行了实现,并提供了可重写的 getTableMap、getFunctionMultimap 和 getSubSchemaMap 方法,用于向 Schema 中注册表、函数和子 ReflectiveSchema 和 JdbcCatalogSchema 都是继承了 AbstractSchema 完成 Schema 注册。

在calcite的其他包中还有其他的实现,例如 CsvSchema,FileSchema等。

SchemaPlus 是对 Schema接口的扩展,用户自定义的schema不需要实现这个接口,但是schema在系统实际传递中,会被这个接口包装。换句话说,就是用户只需要创建schema,而在Calcite系统的其他地方使用schema时,是通过SchemaPlus来访问的。Calcite 内部提供了 SchemaPlus 的生成方法,例如:CalciteSchema#plus() 方法。

public interface SchemaPlus extends Schema {

  @Nullable SchemaPlus getParentSchema();
  String getName();
  @Override @Nullable SchemaPlus getSubSchema(String name);
  /** 添加一个schema 作为子 schema ,并返回包装后的 SchemaPlus */
  SchemaPlus add(String name, Schema schema);
  /** Adds a table to this schema. */
  void add(String name, Table table);
  /** Adds a function to this schema. */
  void add(String name, Function function);
  /** Adds a type to this schema.  */
  void add(String name, RelProtoDataType type);
  /** Adds a lattice to this schema. */
  void add(String name, Lattice lattice);
  @Override boolean isMutable();
  /** Returns an underlying object. */
  <T extends Object> @Nullable T unwrap(Class<T> clazz);
  void setPath(ImmutableList<ImmutableList<String>> path);
  void setCacheEnabled(boolean cache);
  boolean isCacheEnabled();
}

Table代表一个数据集,其中所有字段的类型由RelDataType定义。Table的典型创建方法是Calcite从schema中调用getTable方法。一个表不需要知道自己的name。这是为了表的重用(有多个名字或在多个schema中),这个概念和UNIX文件系统中的Inode有相似之处。

public interface Table {
  /* 返回表一行的类型(字段的名字和其类型) */
  RelDataType getRowType(RelDataTypeFactory typeFactory);

  /** 返回表的统计 */
  Statistic getStatistic();

  /** 返回表的类型,例如: table, view, index, join, stream 等*/
  Schema.TableType getJdbcTableType();

  /** 给定的列是否可以上卷 */
  boolean isRolledUp(String column);

  /** 判断给定的上卷列能不能用于给定的聚合函数。 */
  boolean rolledUpColumnValidInsideAgg(String column, SqlCall call,
      @Nullable SqlNode parent, @Nullable CalciteConnectionConfig config);
}

下图是 calcite-core 包中的部分Table类图。

classDiagram
%% 核心接口:Table
    class Table {
        <<interface>>
        + getRowType(RelDataTypeFactory) RelDataType
        + getStatistic() Statistic
        + getJdbcTableType() Schema.TableType
    }
    %% Table 的主要直接子接口
    class ScannableTable {
        <<interface>>
        +scan(DataContext root) Enumerable~Object[]~
    }

    class FilterableTable {
        <<interface>>
        +scan(DataContext root, List~RexNode~ filters) Enumerable~Object[]~
    }

    class ProjectableFilterableTable {
        <<interface>>
        +scan(DataContext root, List~RexNode~ filters, int[] projects) Enumerable~Object[]~
    }

    class TranslatableTable {
        <<interface>>
        +toRel(RelOptTable.ToRelContext context, RelOptTable table) RelNode
    }

    class StreamableTable {
        <<interface>>
        +stream() Table
    }

    class QueryableTable{
     <<interface>>
     + asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName)  Queryable~T~
    }
    QueryableTable --|> Table

    %% 抽象实现类
    class AbstractQueryableTable {
        <<abstract>>
        #AbstractQueryableTable(String elementType)
        +getRowType(...) RelDataType
        +getElementType() String
    }

    class AbstractTable {
        <<abstract>>
        +AbstractTable()
        +getRowType(...) RelDataType
        +getStatistic() Statistic
        +getJdbcTableType() Schema.TableType
    }

    %% 实现关系
    Table <|-- ScannableTable
    Table <|-- FilterableTable
    Table <|-- ProjectableFilterableTable
    Table <|-- TranslatableTable
    Table <|-- StreamableTable

    %% 抽象类实现接口
    AbstractTable ..|> Table
    AbstractQueryableTable ..|> QueryableTable
    AbstractQueryableTable --|> AbstractTable

    %% 抽象类的常见具体用途
    class ViewTable {
        +ViewTable(...)
    }
    class MaterializedViewTable {
        +MaterializedViewTable(...)
    }
    class JdbcTable
    AbstractQueryableTable <|-- JdbcTable
    %% 具体实现继承抽象类
    ViewTable --|> AbstractTable
    MaterializedViewTable --|> AbstractTable
类/接口角色与核心作用
Table (接口)最顶层的表抽象。定义了获取行类型(RowType)、统计信息(Statistic)和表类型(JdbcTableType)等元数据的基本方法。
ScannableTable (接口)可扫描表。表示可以直接枚举所有行的最基础表类型。scan方法返回一个Enumerable对象。
FilterableTable (接口)可过滤表。在ScannableTable基础上增强,允许在扫描时将过滤条件下推,减少数据读取量。
ProjectableFilterableTable (接口)可投影和过滤的表。进一步允许将列选择(projects)条件下推,是最强大的基础表类型之一。
TranslatableTable (接口)可翻译的表。这是Calcite优化的核心。实现此接口的表可以将自己“翻译”成一个关系代数表达式(RelNode),让优化器能对其进行完整的规则优化(如谓词下推、聚合计算等)。很多高级适配器(如JDBC、Cassandra)都实现此接口。
StreamableTable (接口)可流的表。表示可以产生流式数据的表。
AbstractTable (抽象类)基础的骨架实现。提供了getRowType, getStatistic等方法的简单默认实现。自定义简单表时通常直接继承此类。ViewTable和MaterializedViewTable就继承自它。
AbstractQueryableTable (抽象类)用于实现可查询的表,与Calcite的查询能力集成。
ViewTable(类)用于视图处理,通过将视图定义语句 viewSql 转化为 AST 及关系代数,并在 toRel 处理过程中将原有的视图查询语句展开,变换为对原始表的查询,从而实现视图语义。
MaterializedViewTable (类)物化视图表

RelDataType 代表了关系表达式返回的数据行类型或者标量表达式的类型,Calcite 支持了所有的 SQL 数据类型,也包括结构和数组类型。RelDataType 接口中的主要方法如下:

public interface RelDataType {
  ... ...
  // 获取结构类型中的字段,Calcite 中关系表达式返回的数据行类型使用 RelDataType 表示,每一列的类型通过 RelDataTypeField 表示
  // RelDataTypeField 内部仍然封装了 RelDataType 表示字段类型
  List<RelDataTypeField> getFieldList();
  // 当前类型是否支持为空
  boolean isNullable();
  RelDataType getComponentType();
  RelDataType getKeyType();
  RelDataType getValueType();
  // 当前类型的字符集编码
  Charset getCharset();
  // 当前类型的排序规则
  SqlCollation getCollation();
  // 获取该类型的 JDBC 精度(字段长度,例如:-4.75,precision 为 3)
  int getPrecision();
  // 获取该类型的范围(小数位数,例如:-4.75,scale 为 2)
  int getScale();
  // 获取 SQL 类型
  SqlTypeName getSqlTypeName();
  ... ...
}

RelDataTypeField 代表了struct类型中的一个字段.

public interface RelDataTypeField extends Map.Entry<String, RelDataType> {
  /* 返回字段对应的名称*/
  String getName();
  /*返回字段的Index*/
  int getIndex();
  /*返回字段的类型*/
  RelDataType getType();
  /** Returns true if this is a dynamic star field. */
  boolean isDynamicStar();
}

Calcite 对函数的定义是:接受参数并返回结果的命名表达式。函数通过 Schema 进行注册,可以通过 Schema#getFunctions获取函数,然后根据参数类型获取对应的函数。下面是 Function 接口声明:

public interface Function {   
  	// 获取函数参数
    List<FunctionParameter> getParameters();
}

Function 接口提供了 getParameters 获取函数参数的方法,Function 接口有 ScalarFunction、AggregateFunction、TableFunction 和 TableMarco 等几个主要的子接口。

  • ScalarFunction 对应标量函数,也就是函数返回的结果为一个标量- AggregateFunction 对应聚合函数,会将多个值聚合计算为一个标量返回。
  • TableFunction 和 TableMacro 都对应了表函数,会返回一个表,他们的区别是:
    • TableMacro 会在编译期间进行调用,编译期展开表达式允许 Calcite 实现更加强大的查询优化,例如我们可以对视图在编译期进行展开。
    • 相比于 TableMacro,TableFunction 则需要在执行阶段才能知道表的结果。

以上介绍了 Calcite Catalog 体系中涉及到的类及其主要作用,下面将介绍Calcite 如何注入Catalog。

以上篇Apache Calcite Quick Start中的quick start demo为例。

calcite-core包中的META-INF/services文件夹中有文件java.sql.Driver

org.apache.calcite.jdbc.Driver

JVM 启动时,会通过 ServiceLoader加载Driver服务。

public class Driver extends UnregisteredDriver {
  public static final String CONNECT_STRING_PREFIX = "jdbc:calcite:";

  protected final @Nullable Supplier<CalcitePrepare> prepareFactory;

  static {
    new Driver().register();
  }
  
  /** Creates a Driver. */
  public Driver() {
    this(null);
  }
  ... ...
} 

// org.apache.calcite.avatica.UnregisteredDriver
/**
 * Registers this driver with the driver manager.
 */
protected void register() {
  try {
    DriverManager.registerDriver(this);
  } catch (SQLException e) {
    System.out.println(
        "Error occurred while registering JDBC driver "
        + this + ": " + e.toString());
  }
}

当我们使用Connection connection = DriverManager.getConnection("jdbc:calcite:", info);获取连接时,DriverManager会使用上面注册的Driver 生成连接,最终生成的连接是CalciteConnectionImpl

// org.apache.calcite.avatica.UnregisteredDriver
public Connection connect(String url, Properties info) throws SQLException {
  if (!acceptsURL(url)) {
    return null;
  }
  final String prefix = getConnectStringPrefix();
  assert url.startsWith(prefix);
  final String urlSuffix = url.substring(prefix.length());
  final Properties info2 = ConnectStringParser.parse(urlSuffix, info);
  // CalciteConnectionImpl
  final AvaticaConnection connection =
      factory.newConnection(this, factory, url, info2);
  handler.onConnectionInit(connection);
  return connection;
}

CalciteConnectionImpl在初始化时会主动创建 rootSchema。

// CalciteConnectionImpl#<init>
this.rootSchema = 
        requireNonNull(rootSchema != null
            ? rootSchema
            : CalciteSchema.createRootSchema(true));

// CalciteSchema#createRootSchema
if (addMetadataSchema) {
  rootSchema.add("metadata", MetadataSchema.INSTANCE);
}

CalciteSchema.createRootSchema方法会根据参数传递的标识决定是否创建 metadata Schema,metadata Schema 会注册 COLUMNS、TABLES 等系统表以提供相关的查询。

注入 Catalog 有两种方式:

  • 编程API
  • Json配置文件

上面的 Demo 就是通过编程 API 方式注入 Catalog。

// 获取Calcite的根Schema
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 注册schema
final SchemaPlus rootSchema = calciteConnection.getRootSchema();
rootSchema.add("hr", new ReflectiveSchema(new HrSchema()));

官方CSV Catalog就是使用 Json 配置方式注入 Catalog。下面是CSV Catalog 配置文件的一个例子。

{
  "version": "1.0",
  "defaultSchema": "SALES", // 默认 Schema
  "schemas": [ // Schema 列表
    {
      "name": "SALES",
      "type": "custom", // 类型, MAP, JDBC 或者 CUSTOM
      "factory": "info.victorchu.calcite.tutorials.csv.CsvSchemaFactory", // custom 类型需要 Factory
      "operand": { // Factory 的参数
        "directory": "sales"
      }
    }
  ]
}

然后将配置文件路径放到 创建连接时的Properties属性中:

Properties info = new Properties();
info.put("model", "/resource/SALES.json");

org.apache.calcite.jdbc.Driver在创建完 Connection 后,会通过钩子函数注入 Catalog:


@Override protected Handler createHandler() {
  return new HandlerImpl() {
    @Override public void onConnectionInit(AvaticaConnection connection_)
        throws SQLException {
      final CalciteConnectionImpl connection =
          (CalciteConnectionImpl) connection_;
      super.onConnectionInit(connection);
      // 获取属性中的 model 参数
      final String model = model(connection);
      if (model != null) {
        // 注入 Catalog
        try {
          ModelHandler h = new ModelHandler(connection.getRootSchema(), model);
          String defaultName = h.defaultSchemaName();
          if (defaultName != null) {
            connection.setSchema(defaultName);
          }
        } catch (IOException e) {
          throw new SQLException(e);
        }
      }
      connection.init();
    }
    ... ...
  };
}

相关内容