Cómo leer de hbase usando spark


El siguiente código leerá desde la hbase, luego lo convertirá a la estructura json y lo convertirá a schemaRDD , pero el problema es que estoy using List para almacenar la cadena json y luego pasarla a JavaRDD, para datos de aproximadamente 100 GB el maestro se cargará con datos en memoria. ¿Cuál es la forma correcta de cargar los datos de hbase a continuación,realizar la manipulación, a continuación, convertir a JavaRDD.

package hbase_reader;


import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;

import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;

import com.google.common.collect.Lists;

public class hbase_reader {

    public static void main(String[] args) throws IOException, ParseException {

        List<String> jars = Lists.newArrayList("");

        SparkConf spconf = new SparkConf();
        spconf.setMaster("local[2]");
        spconf.setAppName("HBase");
        //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1");
        spconf.setJars(jars.toArray(new String[jars.size()]));
        JavaSparkContext sc = new JavaSparkContext(spconf);
        //spconf.set("spark.executor.memory", "1g");

        JavaSQLContext jsql = new JavaSQLContext(sc);


        HBaseConfiguration conf = new HBaseConfiguration();
        String tableName = "HBase.CounData1_Raw_Min1";
        HTable table = new HTable(conf,tableName);
        try {

            ResultScanner scanner = table.getScanner(new Scan());
            List<String> jsonList = new ArrayList<String>();

            String json = null;

            for(Result rowResult:scanner) {
                json = "";
                String rowKey  = Bytes.toString(rowResult.getRow());
                for(byte[] s1:rowResult.getMap().keySet()) {
                    String s1_str = Bytes.toString(s1);

                    String jsonSame = "";
                    for(byte[] s2:rowResult.getMap().get(s1).keySet()) {
                        String s2_str = Bytes.toString(s2);
                        for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) {
                            String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3));
                            jsonSame += "\""+s2_str+"\":"+s3_str+",";
                        }
                    }
                    jsonSame = jsonSame.substring(0,jsonSame.length()-1);
                    json += "\""+s1_str+"\""+":{"+jsonSame+"}"+",";
                }
                json = json.substring(0,json.length()-1);
                json = "{\"RowKey\":\""+rowKey+"\","+json+"}";
                jsonList.add(json);
            }

            JavaRDD<String> jsonRDD = sc.parallelize(jsonList);

            JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD);




            System.out.println(schemaRDD.take(2));

        } finally {
            table.close();
        }

    }

}
Author: smola, 2014-07-30

4 answers

Un Ejemplo Básico para Leer los datos de HBase usando Spark (Scala), También puede escribir esto en Java:

import org.apache.hadoop.hbase.client.{HBaseAdmin, Result}
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.spark._

object HBaseRead {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val conf = HBaseConfiguration.create()
    val tableName = "table1"

    System.setProperty("user.name", "hdfs")
    System.setProperty("HADOOP_USER_NAME", "hdfs")
    conf.set("hbase.master", "localhost:60000")
    conf.setInt("timeout", 120000)
    conf.set("hbase.zookeeper.quorum", "localhost")
    conf.set("zookeeper.znode.parent", "/hbase-unsecure")
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val admin = new HBaseAdmin(conf)
    if (!admin.isTableAvailable(tableName)) {
      val tableDesc = new HTableDescriptor(tableName)
      admin.createTable(tableDesc)
    }

    val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    println("Number of Records found : " + hBaseRDD.count())
    sc.stop()
  }
}

ACTUALIZADO -2016

A partir de Spark 1.0.x+, ahora también puede usar el conector Spark-HBase:

Maven Dependencia para Incluir:

<dependency>
  <groupId>it.nerdammer.bigdata</groupId>
  <artifactId>spark-hbase-connector_2.10</artifactId>
  <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x
</dependency>

Y encontrar un código de ejemplo a continuación para el mismo:

import org.apache.spark._
import it.nerdammer.spark.hbase._

object HBaseRead extends App {
    val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]")
    sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //e.g. 192.168.1.1 or localhost or your hostanme
    val sc = new SparkContext(sparkConf)

    // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then:

    val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document")
    .select("DocID", "Title").inColumnFamily("SMPL")

    println("Number of Records found : " + docRdd .count())
}

ACTUALIZADO-2017

A partir de la chispa 1.6.x+, ahora también puede usar el conector SHC (usuarios de Hortonworks o HDP):

Maven Dependencia a Incluir :

    <dependency>
        <groupId>com.hortonworks</groupId>
        <artifactId>shc</artifactId>
        <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x
    </dependency>

La principal ventaja de usar este conector es que tiene flexibilidad en la definición del Esquema y no necesita parámetros codificados como en nerdammer/spark-hbase-connector. También recuerde que es compatible con Spark 2.x así que este conector es bastante flexible y proporciona soporte de extremo a extremo en Problemas y PRs.

Encuentre la siguiente ruta del repositorio para el readme y las muestras más recientes:

Conector Hortonworks Spark HBase

También puede convertir este RDD a DataFrames y ejecutar SQL sobre él o puede asignar estos Dataset o DataFrames a Java definido por el usuario Pojo o clases Case. Funciona genial.

Por favor, comente a continuación si necesita algo más.

 40
Author: Murtaza Kanchwala,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-03-08 07:00:53

Prefiero leer desde hbase y hacer la manipulación json todo en spark.
Spark proporciona JavaSparkContext.Función newAPIHadoopRDD para leer datos del almacenamiento de hadoop, incluida la HBase. Tendrá que proporcionar la configuración de HBase, el nombre de la tabla y el escaneo en el parámetro de configuración y el formato de entrada de la tabla y su clave-valor

Puede usar table input format class y su parámetro job para proporcionar el nombre de la tabla y escanear configuración

Ejemplo:

conf.set(TableInputFormat.INPUT_TABLE, "tablename");
JavaPairRDD<ImmutableBytesWritable, Result> data = 
jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);

Entonces puedes hacer la manipulación json en spark. Dado que spark puede hacer recálculo cuando la memoria está llena, solo cargará los datos necesarios para la parte de recálculo (cmiiw) para que no tenga que preocuparse por el tamaño de los datos

 10
Author: Averman,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2014-09-23 09:53:12

Solo para agregar un comentario sobre cómo agregar scan:

TableInputFormat tiene los siguientes atributos:

  1. SCAN_ROW_START
  2. SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey");
conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
 6
Author: Zhang Kan,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2015-06-08 06:36:27

Dado que la pregunta no es nueva, hay algunas otras alternativas por ahora:

No sé mucho sobre el primer proyecto, pero parece que no es compatible con Spark 2.x. Sin embargo, tiene un rico soporte a nivel RDD para Spark 1.6.x.

Spark-on-HBase, por otro lado, tiene ramas para Spark 2.0 y próxima Spark 2.1. Este proyecto es muy prometedor, ya que se centra en las API de Dataset/DataFrame. Bajo el capó, implementa la API estándar de Spark Datasource y aprovecha el motor Spark Catalyst para la optimización de consultas. Los desarrolladores afirman aquí que es capaz de podar particiones, podar columnas, empujar predicados y lograr la localidad de datos.

Un ejemplo simple, que utiliza el artefacto com.hortonworks:shc:1.0.0-2.0-s_2.11 de este repositorio y Spark 2.0.2, se presenta a continuación:

case class Record(col0: Int, col1: Int, col2: Boolean)

val spark = SparkSession
  .builder()
  .appName("Spark HBase Example")
  .master("local[4]")
  .getOrCreate()

def catalog =
  s"""{
      |"table":{"namespace":"default", "name":"table1"},
      |"rowkey":"key",
      |"columns":{
      |"col0":{"cf":"rowkey", "col":"key", "type":"int"},
      |"col1":{"cf":"cf1", "col":"col1", "type":"int"},
      |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"}
      |}
      |}""".stripMargin

val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0))

// write
spark
  .createDataFrame(artificialData)
  .write
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .option(HBaseTableCatalog.newTable, "5")
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .save()

// read
val df = spark
  .read
  .option(HBaseTableCatalog.tableCatalog, catalog)
  .format("org.apache.spark.sql.execution.datasources.hbase")
  .load()

df.count()
 6
Author: Anton Okolnychyi,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2016-12-14 17:11:35