时间:2024-08-19 点击: 次 来源:网络 作者:佚名 - 小 + 大
2. Kerberos认证通用代码 kerberos认证代码 import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; public class KerberosTest { /** * 获取hdfs配置文件路径 */ private static String PATH_TO_HDFS_SITE_XML = KerberosTest.class.getClassLoader().getResource("hdfs-site.xml").getPath(); /** * 获取core配置文件路径 */ private static String PATH_TO_CORE_SITE_XML = KerberosTest.class.getClassLoader().getResource("core-site.xml").getPath(); /** * 获取keytab文件路径 */ private static String PATH_TO_KEYTAB = KerberosTest.class.getClassLoader().getResource("user.keytab").getPath(); /** * 获取conf文件路径,也可以在启动-Djava.security.krb5.conf=/etc/krb5.conf 指定 */ private static String PATH_TO_KRB5_CONF = KerberosTest.class.getClassLoader().getResource("krb5.conf").getPath(); /** * 分配的用户名,如果加载krb5文件可以只指定用户名,否则需要加域:develop@域 */ private static String PRNCIPAL_NAME = "develop"; private FileSystem fs; private Configuration conf; /** * initialize Configuration */ private void initConf() { conf = new Configuration(); // 客户端同步服务端的配置项 conf.addResource(new Path(PATH_TO_HDFS_SITE_XML)); conf.addResource(new Path(PATH_TO_CORE_SITE_XML)); } /** * 登录 * * @throws IOException 登录失败 */ private void login() throws IOException { //此处通过core-site和hdfs-site文件判断是否有kerberos权限 if (!"kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) { return; } System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF); UserGroupInformation.setConfiguration(conf); UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB); } /** * 初始化文件系统 * * @throws IOException */ private void initFileSystem() throws IOException { fs = FileSystem.get(conf); } } 3. 配置文件与常量定义 3.1. 配置文件 hidcp.version=hidcp-hbase #kafka-broker地址 kafka.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 #kafka过车处理后TOPIC,一般不需修改 kafka.vehicle.topic=PASSDATAPROCESSED #支持两种:(1) kerberos认证 sasl.mechanism = GSSAPI (2)sasl.mechanism = SCRAM-SHA-512 kafka.sasl.opened = false kafka.sasl.mechanism=SCRAM-SHA-512 kafka.sasl.kerberos.service.name=kafka kafka.sasl.security.protocol=SASL_PLAINTEXT kafka.sasl.java.security.auth.login.config=kafka-client-jaas.conf #HBASE-KERBEROS-COMMON #默认位置/etc/krb5.conf kerberos.java.security.krb5.conf=/etc/krb5.conf #默认位置/etc/user.keytab kerberos.keytab.path=/etc/user.keytab #kerberos提供登录用户名 kerberos.client.principal.name=kafka 3.2. 常量定义 public class Constant { public static final String HIDCP_VERSION = "hidcp.version"; public static final String SASL_DEFAULT = "false"; public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers"; public static final String KAFKA_VEHICLE_TOPIC = "kafka.vehicle.topic"; public static final String KAFKA_SASL_OPENED = "kafka.sasl.opened"; public static final String KAFKA_SASL_MECHANISM = "kafka.sasl.mechanism"; public static final String KAFKA_SECURITY_PROTOCOL = "kafka.sasl.security.protocol"; public static final String KAFKA_SASL_KERBEROS_SERVICE_NAME = "kafka.sasl.kerberos.service.name"; public static final String KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG ="kafka.sasl.java.security.auth.login.config"; public static final String VEHICLE_DEFAULT_TOPIC = "PASSDATAPROCESSED"; public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf"; public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config"; public static final String KERBEROS_JAVA_SECURITY_KRB5_CONF = "kerberos.java.security.krb5.conf"; public static class KafkaConfig{ public static final String SASL_MECHANISM = "sasl.mechanism"; public static final String SECURITY_PROTOCOL = "security.protocol"; public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name"; } } 4. Hbase-Kerberos认证 hbase org.apache.hadoop.hbase.client.Connection conn; org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); config.addResource("core-site.xml"); config.addResource("hdfs-site.xml"); config.addResource("hbase-site.xml"); //config.set("hbase.zookeeper.quorum",properties.getProperty("hbase.zookeeper.quorum")); //config.set("hbase.zookeeper.property.clientPort",properties.getProperty("hbase.zookeeper.property.clientPort")); if ("kerberos".equals(config.get("hadoop.security.authentication"))){ System.setProperty("java.security.krb5.conf", properties.getProperty("java.security.krb5.conf.path", "/etc/krb5.conf")); UserGroupInformation.setConfiguration(config); //登陆认证 UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.client.principal.name"), properties.getProperty("kerberos.keytab.path", "/etc/user.keytab")); } conn = HBaseUtils.getConnection(config); 5. Kafka-kerberos认证 5.1. Flink-kafka-source工厂类 import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceBuilder; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import java.util.Properties; import static com.hisense.hidcp.indexes.algo.common.Constant.*; import static com.hisense.hidcp.indexes.algo.common.Constant.KafkaConfig.*; public class KafkaSourceFactory { public static <String> KafkaSource<String> kafkaSourceFactory(String clientId, Properties properties) { KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.<String>builder().setBootstrapServers(properties.getProperty(KAFKA_BOOTSTRAP_SERVERS)) .setTopics(properties.getProperty(KAFKA_VEHICLE_TOPIC, VEHICLE_DEFAULT_TOPIC)) .setGroupId(properties.getProperty(HIDCP_VERSION) + clientId) .setStartingOffsets(OffsetsInitializer.latest()) .setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) { System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG)); System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF)); Properties saslProperties = new Properties(); saslProperties.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM)); saslProperties.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL)); saslProperties.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME)); kafkaSourceBuilder.setProperties(saslProperties); } return kafkaSourceBuilder.build(); } } 使用方式 KafkaSource<String> stringKafkaSource = KafkaSourceFactory.kafkaSourceFactory("city-hbase", properties); DataStream<String> dataStream = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "PASS_DATA_SOURCE"); springboot 参考:Spring Boot 整合 Kafka 教程 - 掘金 (juejin.cn) 生产者: @Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty(KAFKA_BOOTSTRAP_SERVERS)); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) { System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG)); System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF)); //Properties saslProperties = new Properties(); props.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM)); props.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL)); props.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME)); } return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } } 消费者: @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty(KAFKA_BOOTSTRAP_SERVERS)); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) { System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG, properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG)); System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF)); //Properties saslProperties = new Properties(); props.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM)); props.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL)); props.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME)); } return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } } 5.2. kafka-client-jaas.conf文件 KafkaClient { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true keyTab="/etc/user.keytab" storeKey=true useTicketCache=false principal="kafka-client@ABC.COM"; }; 6. Hive kerberos认证 6.1. jdbc-url jdbc:hive2://*****:10000/database;principal=hive/****@DWSP.COM 其中,principal为hive-site.xml文件中: 6.2. hive认证代码与hbase一致 org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();; config.addResource("core-site.xml"); config.addResource("hdfs-site.xml"); config.addResource("hive-site.xml"); if ("kerberos".equals(config.get("hadoop.security.authentication"))){ System.setProperty("java.security.krb5.conf", properties.getProperty("java.security.krb5.conf.path", "/etc/krb5.conf")); UserGroupInformation.setConfiguration(config); //登陆认证 UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.client.principal.name"), properties.getProperty("kerberos.keytab.path", "/etc/user.keytab")); } Class.forName("org.apache.hive.jdbc.HiveDriver"); java.sql.Connection conn = DriverManager.getConnection( props.getProperty("hive.JDBC.url")); |
上一篇:大数据国产化迁移通用方案