Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析
更新时间:2021年08月02日 16:02:20 作者:CNBLOG
这篇文章主要介绍了Java使用pulsar-flink-connector读取pulsar catalog元数据代码剖析,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下
简介
通过 pulsar-flink-connector 读取到 Apache pulsar 中的namespaces、topics的元数据信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector-2.11-1.12</artifactId> <version>2.7.3</version> </dependency> <!-- JAR repositories --> <repositories> <repository> <id>central</id> <layout>default</layout> <url>https://repo1.maven.org/maven2</url> </repository> <repository> <id>bintray-streamnative-maven</id> <name>bintray</name> <url>https://dl.bintray.com/streamnative/maven</url> </repository> </repositories>
CODE
使用PulsarMetadataReader获取元数据
package com.levi.demo; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.impl.auth.AuthenticationToken; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.schema.SchemaInfo; import org.apache.pulsar.common.schema.SchemaType; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; /** * Test. * * @author levi * @version 1.0 **/ public class Test { public static void main(String[] args) { final ClientConfigurationData configurationData = new ClientConfigurationData(); configurationData.setServiceUrl("pulsar://127.0.0.1:6650"); //Your Pulsar Token final AuthenticationToken token = new AuthenticationToken( "eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx"); configurationData.setAuthentication(token); try (final PulsarMetadataReader reader = new PulsarMetadataReader("http://127.0.0.1:8443", configurationData, "", new HashMap(), -1, -1)) { //获取namespaces final List<String> namespaces = reader.listNamespaces(); System.out.println("namespaces: " + namespaces.toString()); for (final String namespace : namespaces) { //获取Topics final List<String> topics = reader.getTopics(namespace); System.out.println("topic: " + topics.toString()); for (String topic : topics) { //获取字段SchemaInfo final SchemaInfo schemaInfo = reader.getPulsarSchema(topic); final String name = schemaInfo.getName(); System.out.println("SchemaName:" + name); //topicName final SchemaType type = schemaInfo.getType(); System.out.println("SchemaType:" + type.toString());// "JSON"... final Map<String, String> properties = schemaInfo.getProperties(); System.out.println(properties); final String schemaDefinition = schemaInfo.getSchemaDefinition(); System.out.println(schemaDefinition); // Field info. } } } catch (IOException | PulsarAdminException e) { e.printStackTrace(); } } }
到此这篇关于Java使用pulsar-flink-connector读取pulsar catalog元数据的文章就介绍到这了,更多相关Java读取pulsar catalog元数据内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
Java 通过手写分布式雪花SnowFlake生成ID方法详解
SnowFlake是twitter公司内部分布式项目采用的ID生成算法,开源后广受国内大厂的好评。由这种算法生成的ID,我们就叫做SnowFlakeID,下面我们来详细看看2022-04-04Vue结合Springboot实现用户列表单页面(前后端分离)
本文主要介绍了Vue结合Springboot实现用户列表单页面,可以实现简单的查询,删除,修改,和添加用户信息功能,具有一定的参考价值,感兴趣的小伙伴们可以参考一下2021-07-07
最新评论