登陆 | 注册 | 留言 | 设首页 | 加收藏
当前位置: 网站首页 > 前沿技术 > 文章 当前位置: 前沿技术 > 文章

大数据权限认证Kerberos

时间:2024-08-19    点击: 次    来源:网络    作者:佚名 - 小 + 大

2. Kerberos认证通用代码
kerberos认证代码
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
 
public class KerberosTest {
    /**
     * 获取hdfs配置文件路径
     */
    private static String PATH_TO_HDFS_SITE_XML = KerberosTest.class.getClassLoader().getResource("hdfs-site.xml").getPath();
    /**
     * 获取core配置文件路径
     */
    private static String PATH_TO_CORE_SITE_XML = KerberosTest.class.getClassLoader().getResource("core-site.xml").getPath();
    /**
     * 获取keytab文件路径
     */
    private static String PATH_TO_KEYTAB = KerberosTest.class.getClassLoader().getResource("user.keytab").getPath();
    /**
     * 获取conf文件路径,也可以在启动-Djava.security.krb5.conf=/etc/krb5.conf 指定
     */
    private static String PATH_TO_KRB5_CONF = KerberosTest.class.getClassLoader().getResource("krb5.conf").getPath();
    /**
     * 分配的用户名,如果加载krb5文件可以只指定用户名,否则需要加域:develop@域
     */
    private static String PRNCIPAL_NAME = "develop";
    private FileSystem fs;
    private Configuration conf;
 
    /**
     * initialize Configuration
     */
    private void initConf() {
        conf = new Configuration();
 
        // 客户端同步服务端的配置项
        conf.addResource(new Path(PATH_TO_HDFS_SITE_XML));
        conf.addResource(new Path(PATH_TO_CORE_SITE_XML));
    }
 
    /**
     * 登录
     *
     * @throws IOException 登录失败
     */
    private void login() throws IOException {
        //此处通过core-site和hdfs-site文件判断是否有kerberos权限
        if (!"kerberos".equalsIgnoreCase(conf.get("hadoop.security.authentication"))) {
            return;
        }
        System.setProperty("java.security.krb5.conf", PATH_TO_KRB5_CONF);
 
        UserGroupInformation.setConfiguration(conf);
        UserGroupInformation.loginUserFromKeytab(PRNCIPAL_NAME, PATH_TO_KEYTAB);
    }
 
    /**
     * 初始化文件系统
     *
     * @throws IOException
     */
    private void initFileSystem() throws IOException {
        fs = FileSystem.get(conf);
    }
}
3. 配置文件与常量定义
3.1. 配置文件
hidcp.version=hidcp-hbase
#kafka-broker地址
kafka.bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
#kafka过车处理后TOPIC,一般不需修改
kafka.vehicle.topic=PASSDATAPROCESSED
#支持两种:(1) kerberos认证 sasl.mechanism = GSSAPI (2)sasl.mechanism = SCRAM-SHA-512
kafka.sasl.opened = false
kafka.sasl.mechanism=SCRAM-SHA-512
kafka.sasl.kerberos.service.name=kafka
kafka.sasl.security.protocol=SASL_PLAINTEXT
kafka.sasl.java.security.auth.login.config=kafka-client-jaas.conf
#HBASE-KERBEROS-COMMON
#默认位置/etc/krb5.conf
kerberos.java.security.krb5.conf=/etc/krb5.conf
#默认位置/etc/user.keytab
kerberos.keytab.path=/etc/user.keytab
#kerberos提供登录用户名
kerberos.client.principal.name=kafka
3.2. 常量定义
public class Constant {
 
    public static final String HIDCP_VERSION = "hidcp.version";
 
    public static final String SASL_DEFAULT = "false";
 
    public static final String KAFKA_BOOTSTRAP_SERVERS = "kafka.bootstrap.servers";
    public static final String KAFKA_VEHICLE_TOPIC = "kafka.vehicle.topic";
    public static final String KAFKA_SASL_OPENED = "kafka.sasl.opened";
    public static final String KAFKA_SASL_MECHANISM = "kafka.sasl.mechanism";
    public static final String KAFKA_SECURITY_PROTOCOL = "kafka.sasl.security.protocol";
    public static final String KAFKA_SASL_KERBEROS_SERVICE_NAME = "kafka.sasl.kerberos.service.name";
    public static final String KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG ="kafka.sasl.java.security.auth.login.config";
 
    public static final String VEHICLE_DEFAULT_TOPIC = "PASSDATAPROCESSED";
 
    
    public static final String JAVA_SECURITY_KRB5_CONF = "java.security.krb5.conf";
    public static final String JAVA_SECURITY_AUTH_LOGIN_CONFIG = "java.security.auth.login.config";
    public static final String KERBEROS_JAVA_SECURITY_KRB5_CONF = "kerberos.java.security.krb5.conf";
 
    public static class KafkaConfig{
        public static final String SASL_MECHANISM = "sasl.mechanism";
        public static final String SECURITY_PROTOCOL = "security.protocol";
        public static final String SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
 
    }
}






4. Hbase-Kerberos认证
hbase
org.apache.hadoop.hbase.client.Connection conn;
 
org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.addResource("core-site.xml");
config.addResource("hdfs-site.xml");
config.addResource("hbase-site.xml");
//config.set("hbase.zookeeper.quorum",properties.getProperty("hbase.zookeeper.quorum"));
//config.set("hbase.zookeeper.property.clientPort",properties.getProperty("hbase.zookeeper.property.clientPort"));
if ("kerberos".equals(config.get("hadoop.security.authentication"))){
System.setProperty("java.security.krb5.conf", properties.getProperty("java.security.krb5.conf.path", "/etc/krb5.conf"));
      UserGroupInformation.setConfiguration(config);
      //登陆认证
      UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.client.principal.name"),
      properties.getProperty("kerberos.keytab.path", "/etc/user.keytab"));
}
conn = HBaseUtils.getConnection(config);


5. Kafka-kerberos认证


5.1. Flink-kafka-source工厂类
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 
 
import java.util.Properties;
 
import static com.hisense.hidcp.indexes.algo.common.Constant.*;
import static com.hisense.hidcp.indexes.algo.common.Constant.KafkaConfig.*;
 
public class KafkaSourceFactory {
    public static <String> KafkaSource<String> kafkaSourceFactory(String clientId, Properties properties) {
 
        KafkaSourceBuilder<String> kafkaSourceBuilder =
            KafkaSource.<String>builder().setBootstrapServers(properties.getProperty(KAFKA_BOOTSTRAP_SERVERS))
                .setTopics(properties.getProperty(KAFKA_VEHICLE_TOPIC, VEHICLE_DEFAULT_TOPIC))
                .setGroupId(properties.getProperty(HIDCP_VERSION) + clientId)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
                .setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) {
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG,
                properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG));
            System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF));
            Properties saslProperties = new Properties();
 
            saslProperties.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM));
            saslProperties.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL));
            saslProperties.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME));
            kafkaSourceBuilder.setProperties(saslProperties);
        }
        return kafkaSourceBuilder.build();
    }
}
使用方式

KafkaSource<String> stringKafkaSource = KafkaSourceFactory.kafkaSourceFactory("city-hbase", properties);
DataStream<String> dataStream = env.fromSource(stringKafkaSource, WatermarkStrategy.noWatermarks(), "PASS_DATA_SOURCE");
springboot

参考:Spring Boot 整合 Kafka 教程 - 掘金 (juejin.cn)

生产者:

@Configuration
public class KafkaProducerConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
 
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty(KAFKA_BOOTSTRAP_SERVERS));
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
 
        if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) {
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG,
                properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG));
            System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF));
            //Properties saslProperties = new Properties();
 
            props.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM));
            props.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL));
            props.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME));
        }
        return props;
    }
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
 
}


消费者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
 
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
 
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
 
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getProperty(KAFKA_BOOTSTRAP_SERVERS));
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
 
        if (!SASL_DEFAULT.equals(properties.getProperty(KAFKA_SASL_OPENED))) {
            System.setProperty(JAVA_SECURITY_AUTH_LOGIN_CONFIG,
                properties.getProperty(KAFKA_SASL_SECURITY_AUTH_LOGIN_CONFIG));
            System.setProperty(JAVA_SECURITY_KRB5_CONF, properties.getProperty(KERBEROS_JAVA_SECURITY_KRB5_CONF));
            //Properties saslProperties = new Properties();
 
            props.put(SASL_MECHANISM, properties.getProperty(KAFKA_SASL_MECHANISM));
            props.put(SECURITY_PROTOCOL, properties.getProperty(KAFKA_SECURITY_PROTOCOL));
            props.put(SASL_KERBEROS_SERVICE_NAME, properties.getProperty(KAFKA_SASL_KERBEROS_SERVICE_NAME));
        }
 
        return props;
    }
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
 
}






5.2. kafka-client-jaas.conf文件
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/user.keytab"
   storeKey=true
   useTicketCache=false
   principal="kafka-client@ABC.COM";
};




6. Hive kerberos认证 
6.1. jdbc-url


jdbc:hive2://*****:10000/database;principal=hive/****@DWSP.COM
其中,principal为hive-site.xml文件中:





6.2. hive认证代码与hbase一致
org.apache.hadoop.conf.Configuration config = new org.apache.hadoop.conf.Configuration();;
config.addResource("core-site.xml");
config.addResource("hdfs-site.xml");
config.addResource("hive-site.xml");
if ("kerberos".equals(config.get("hadoop.security.authentication"))){
System.setProperty("java.security.krb5.conf", properties.getProperty("java.security.krb5.conf.path", "/etc/krb5.conf"));
      UserGroupInformation.setConfiguration(config);
      //登陆认证
      UserGroupInformation.loginUserFromKeytab(properties.getProperty("kerberos.client.principal.name"),
      properties.getProperty("kerberos.keytab.path", "/etc/user.keytab"));
}
 Class.forName("org.apache.hive.jdbc.HiveDriver");
 java.sql.Connection conn = DriverManager.getConnection( props.getProperty("hive.JDBC.url"));

上一篇:大数据国产化迁移通用方案

下一篇:腾讯云大数据平台MapReduce-Flink任务提交问题总结

推荐阅读
鲁ICP备2022041402号  |   QQ:8346417  |  地址:山东青岛