Llamar a la función Java/Scala desde una tarea


Antecedentes

Mi pregunta original aquí fue ¿Por qué usar DecisionTreeModel.predict dentro de la función de mapa plantea una excepción? y está relacionado con Cómo generar tuplas de (etiqueta original, etiqueta predicha) en Spark con MLlib?

Cuando usamos Scala API una forma recomendada de obtener predicciones para RDD[LabeledPoint] usando DecisionTreeModel es simplemente mapear sobre RDD:

val labelAndPreds = testData.map { point =>
  val prediction = model.predict(point.features)
  (point.label, prediction)
}

Desafortunadamente un enfoque similar en PySpark no funciona tan bien:

labelsAndPredictions = testData.map(
    lambda lp: (lp.label, model.predict(lp.features))
labelsAndPredictions.first()

Excepción: Parece que está intentando hacer referencia a SparkContext desde una variable de difusión, acción o transforamción. SparkContext solo se puede usar en el controlador, no en el código que ejecuta en workers. Para obtener más información, consulte SPARK-5063.

En lugar de eso la documentación oficial recomienda algo como esto:

predictions = model.predict(testData.map(lambda x: x.features))
labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)

Entonces, ¿qué está pasando aquí? No hay ninguna variable de difusión aquí y Scala API define predict como sigue:

/**
 * Predict values for a single data point using the model trained.
 *
 * @param features array representing a single data point
 * @return Double prediction from the trained model
 */
def predict(features: Vector): Double = {
  topNode.predict(features)
}

/**
 * Predict values for the given data set using the model trained.
 *
 * @param features RDD representing data points to be predicted
 * @return RDD of predictions for each of the given data points
 */
def predict(features: RDD[Vector]): RDD[Double] = {
  features.map(x => predict(x))
}

Así que al menos a primera vista llamar desde acción o transformación no es un problema ya que la predicción parece ser una operación local.

Explicación

Después de algunas investigaciones me di cuenta de que la fuente del problema es un JavaModelWrapper.call método invocado desde DecisionTreeModel.predecir. It acceso SparkContext que se requiere para llamar a la función Java:

callJavaFunc(self._sc, getattr(self._java_model, name), *a)

Pregunta

En el caso de DecisionTreeModel.predict hay un solución recomendada y todo el código requerido ya es parte de la API de Scala, pero ¿hay alguna forma elegante de manejar un problema como este en general?

Las únicas soluciones que se me ocurren en este momento son bastante pesadas:

  • empujando todo a JVM ya sea extendiendo las clases Spark a través de Conversiones implícitas o agregando algún tipo de envoltorios
  • usando la pasarela Py4j directamente
Author: Community, 2015-07-28

1 answers

La comunicación utilizando la pasarela Py4J predeterminada simplemente no es posible. Para entender por qué tenemos que echar un vistazo al siguiente diagrama del documento de Funcionamiento interno de PySpark [1]:

introduzca la descripción de la imagen aquí

Dado que Py4J gateway se ejecuta en el controlador, no es accesible para los intérpretes de Python que se comunican con los trabajadores de JVM a través de sockets (Consulte, por ejemplo PythonRDD / rdd.py).

Teóricamente podría ser posible crear una pasarela Py4J separada para cada trabajador, pero en la práctica es poco probable que sea útil. Ignorando problemas como la confiabilidad Py4J simplemente no está diseñado para realizar tareas intensivas en datos.

¿Hay alguna solución alternativa?

  1. Usando Spark SQL Data Sources API para envolver código JVM.

    Pros: Compatible, de alto nivel, no requiere acceso a la API interna de PySpark

    Contras: Relativamente prolijo y no muy bien documentado, limitado principalmente a la entrada datos

  2. Operando en DataFrames usando UDFs de Scala.

    Pros : Fácil de implementar (ver Spark: ¿Cómo mapear Python con Funciones Definidas por el usuario de Scala o Java?), no hay conversión de datos entre Python y Scala si los datos ya están almacenados en un DataFrame, acceso mínimo a Py4J

    Contras : Requiere acceso a la puerta de enlace Py4J y métodos internos, limitado a Spark SQL, difícil de depurar, no soportado

  3. Creando Scala de alto nivel interfaz de forma similar a como se hace en MLlib.

    Pros: Flexible, capacidad para ejecutar código complejo arbitrario. Puede ser don directamente en RDD (ver por ejemplo MLlib model wrappers) o con DataFrames (ver Cómo usar una clase Scala dentro de Pyspark). Esta última solución parece ser mucho más amigable, ya que todos los detalles del ser-de ya están manejados por la API existente.

    Contras: Nivel bajo, conversión de datos requerida, igual que UDFs requiere acceso a Py4J y API interna, no soportado

    Algunos ejemplos básicos se pueden encontrar en Transformando PySpark RDD con Scala

  4. Uso de la herramienta de gestión de flujo de trabajo externo para cambiar entre trabajos de Python y Scala / Java y pasar datos a un DFS.

    Pros: Fácil de implementar, cambios mínimos en el código

    Contras : Costo de lectura / escritura de datos (Alluxio?)

  5. Usando shared SQLContext (ver para ejemplo Apache Zeppelin o Livy ) para pasar datos entre idiomas invitados usando tablas temporales registradas.

    Pros: muy adecuado para el análisis interactivo

    Contras : No tanto para trabajos por lotes (Zeppelin) o puede requerir orquestación adicional (Livy)


    [22] Joshua Rosen. (2014, August 04) PySpark Internals. Recuperado de https://cwiki.apache.org/confluence/display/SPARK/PySpark + Internos
 37
Author: zero323,
Warning: date(): Invalid date.timezone value 'Europe/Kyiv', we selected the timezone 'UTC' for now. in /var/www/agent_stack/data/www/ajaxhispano.com/template/agent.layouts/content.php on line 61
2017-09-01 10:35:38