阅读更多
1 Paimon With Hive
Doc
First, start a hive cluster by docker-hive
1 2 cd docker-hivedocker-compose up -d
Second, download paimon-hive-connector with corresponding version.
1 wget -O paimon-hive-connector-2.3-0.8.0.jar 'https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-hive-connector-2.3/0.8.0/paimon-hive-connector-2.3-0.8.0.jar'
Then, copy the jar file into hive’s container, and restart it
1 2 3 docker exec docker-hive-hive-server-1 mkdir -p /opt/hive/auxlib docker cp paimon-hive-connector-2.3-0.8.0.jar docker-hive-hive-server-1:/opt/hive/auxlib docker restart docker-hive-hive-server-1
Finally, test it:
1 2 3 4 5 6 7 8 9 10 11 docker exec -it docker-hive-hive-server-1 bash /opt/hive/bin/beeline -u jdbc:hive2://localhost:10000 CREATE TABLE `hive_paimon_test_table`( `a` int COMMENT 'The a field' , `b` string COMMENT 'The b field' ) STORED BY 'org.apache.paimon.hive.PaimonStorageHandler' ; INSERT INTO hive_paimon_test_table (a, b) VALUES (1, '1' ), (2, '2' ); SELECT * FROM hive_paimon_test_table;
2 Paimon With Spark
Doc
First, start a spark cluster by docker-spark
1 2 cd docker-sparkdocker-compose up -d
Second, download paimon-spark with corresponding version.
1 wget -O paimon-spark-3.3-0.8.0.jar 'https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-spark-3.3/0.8.0/paimon-spark-3.3-0.8.0.jar'
Then, copy the jar file into spark’s container
1 docker cp paimon-spark-3.3-0.8.0.jar spark-master:/spark/jars
Finally, test it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 docker exec -it spark-master bash /spark/bin/spark-sql --jars /spark/jars/paimon-spark-3.3-0.8.0.jar \ --conf spark.sql.catalog.my_test_paimon=org.apache.paimon.spark.SparkCatalog \ --conf spark.sql.catalog.my_test_paimon.warehouse=file:/spark/paimon \ --conf spark.sql.extensions=org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions USE my_test_paimon; USE default; create table my_table ( k int, v string ) tblproperties ( 'primary-key' = 'k' ); INSERT INTO my_table VALUES (1, 'Hi' ), (2, 'Hello' ); SELECT * FROM my_table; USE spark_catalog; USE default; SELECT * FROM my_test_paimon.default.my_table;
3 Paimon With Flink
Doc
First, start a flink container by Flink Docker Setup
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 cat > docker-compose.yml << 'EOF' version: "2.2" services: jobmanager: image: apache/flink:1.19-java8 container_name: jobmanager ports: - "8081:8081" command : jobmanager environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager: image: apache/flink:1.19-java8 container_name: taskmanager depends_on: - jobmanager command : taskmanager scale: 1 environment: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 EOF docker-compose up -d
Second, download paimon-flink with corresponding version.
1 2 3 wget -O paimon-flink-1.19-0.8.0.jar 'https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-1.19/0.8.0/paimon-flink-1.19-0.8.0.jar' wget -O paimon-flink-action-0.8.0.jar 'https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-flink-action/0.8.0/paimon-flink-action-0.8.0.jar' wget -O flink-shaded-hadoop-2-uber-2.8.3-10.0.jar 'https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar'
Then, copy the jar file into flink’s container
1 2 3 4 5 6 7 containers=( "jobmanager" "taskmanager" ) for container in ${containers[@]} do docker cp paimon-flink-1.19-0.8.0.jar ${container} :/opt/flink/lib docker cp paimon-flink-action-0.8.0.jar ${container} :/opt/flink/lib docker cp flink-shaded-hadoop-2-uber-2.8.3-10.0.jar ${container} :/opt/flink/lib done
Finally, test it:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 docker exec -it jobmanager /opt/flink/bin/sql-client.sh CREATE CATALOG my_catalog WITH ( 'type' ='paimon' , 'warehouse' ='file:/tmp/paimon' ); USE CATALOG my_catalog; -- create a word count table CREATE TABLE word_count ( word STRING PRIMARY KEY NOT ENFORCED, cnt BIGINT ); -- create a word data generator table CREATE TEMPORARY TABLE word_table ( word STRING ) WITH ( 'connector' = 'datagen' , 'fields.word.length' = '1' ); -- paimon requires checkpoint interval in streaming mode SET 'execution.checkpointing.interval' = '10 s' ; -- write streaming data to dynamic table INSERT INTO word_count SELECT word, COUNT(*) FROM word_table GROUP BY word; -- use tableau result mode SET 'sql-client.execution.result-mode' = 'tableau' ; -- switch to batch mode RESET 'execution.checkpointing.interval' ; SET 'execution.runtime-mode' = 'batch' ; -- olap query the table SELECT * FROM word_count; -- switch to streaming mode SET 'execution.runtime-mode' = 'streaming' ; -- track the changes of table and calculate the count interval statistics SELECT `interval`, COUNT(*) AS interval_cnt FROM (SELECT cnt / 10000 AS `interval` FROM word_count) GROUP BY `interval`;
4 Paimon With Trino
Doc
Firstly, assuming that you’ve already started a trino container named trino
, and can access a hdfs cluster. This can be done according to Trino-Trial
1 2 3 4 5 6 7 8 9 10 11 12 git clone https://github.com/apache/paimon-trino.git cd paimon-trino/paimon-trino-427mvn clean install -DskipTests cd targetmkdir plugintar -zxf paimon-trino-427-*.tar.gz -C plugin docker cp plugin/paimon trino:/usr/lib/trino/plugin docker exec -it trino bash -c 'echo -e "connector.name=paimon\nmetastore=filesystem\nwarehouse=hdfs://namenode:8020/user/paimon" > /etc/trino/catalog/paimon.properties' docker restart trino docker exec -it trino trino --catalog paimon
How to clean:
1 2 docker exec -it --user root trino bash -c 'rm -f /etc/trino/catalog/paimon.properties' docker exec -it --user root trino bash -c 'rm -rf /usr/lib/trino/plugin/paimon'
5 Options
org.apache.paimon.CoreOptions
6 Features
Changelog Producer
7 SDK Demos
7.1 Kerberos
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.security.UserGroupInformation;import org.apache.paimon.catalog.Catalog;import org.apache.paimon.catalog.CatalogContext;import org.apache.paimon.catalog.CatalogFactory;import org.apache.paimon.options.CatalogOptions;import org.apache.paimon.options.Options;public class KerberosDemo { public static void main (String[] args) throws Exception { System.setProperty("sun.security.krb5.debug" , "true" ); String namenode = args[0 ]; String user = args[1 ]; String keytabPath = args[2 ]; String kerberosConfigPath = args[3 ]; System.setProperty("java.security.krb5.conf" , kerberosConfigPath); Configuration hadoopConf = new Configuration (); hadoopConf.set("hadoop.security.authentication" , "kerberos" ); hadoopConf.set("hadoop.security.authorization" , "true" ); UserGroupInformation.setConfiguration(hadoopConf); UserGroupInformation.loginUserFromKeytab(user, keytabPath); Options options = new Options (); options.set(CatalogOptions.METASTORE, "filesystem" ); options.set(CatalogOptions.WAREHOUSE, String.format("hdfs://%s:8020/users/paimon/warehouse" , namenode)); CatalogContext context = CatalogContext.create(options); Catalog catalog = CatalogFactory.createCatalog(context); System.out.println("Catalog created successfully: " + catalog); } }
8 Issue
8.1 Timeout waiting for connection from pool
Please DO remember to close the reader/writer instance, otherwise, the connections may not be properly released
S3 catalog has this issue.