Поиск и замена в Apache Spark

Мы создали два набора данных, предложениеDataFrame, предложениеDataFrame2, где должна произойти замена поиска.

предложениеDataFrame2 хранит условия поиска и замены.

Мы также выполнили все 11 типов соединений: «внутреннее», «внешнее», «полное», «полное», «левое», «левое», «правое внешнее», «правое», «левое полу», «левое», «крест». ' ни один из них не дал нам результат.

Не могли бы вы сообщить нам, где мы идем неправильно, и пожалуйста, укажите нам правильное направление.

        List<Row> data = Arrays.asList(
            RowFactory.create(0, "Allen jeevi pramod Allen"),
            RowFactory.create(1,"sandesh Armstrong jeevi"),
            RowFactory.create(2,"harsha Nischay DeWALT"));

        StructType schema = new StructType(new StructField[] {
        new StructField("label", DataTypes.IntegerType, false,
          Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
          Metadata.empty()) });
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);


        List<Row> data2 = Arrays.asList(
          RowFactory.create("Allen", "Apex Tool Group"),
          RowFactory.create("Armstrong","Apex Tool Group"),
          RowFactory.create("DeWALT","StanleyBlack"));

        StructType schema2 = new StructType(new StructField[] {
        new StructField("label2", DataTypes.StringType, false,
          Metadata.empty()),
        new StructField("sentence2", DataTypes.StringType, false,
          Metadata.empty()) });
        Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

        Dataset<Row> remainingElements=sentenceDataFrame.join(sentenceDataFrame2,sentenceDataFrame.col("label").equalTo(sentenceDataFrame2.col("label2")),"cross");
        System.out.println("Left anti join count :"+remainingElements.count());

Ввод

Аллен Дживи Прамод Аллен
Сандеш Армстронг Дживи
Харша Нишай ДЕУОЛТ

Ожидаемый результат

Apex Tool Group jeevi pramod Apex Tool Group
sandesh Apex Tool Group jeevi
сурша Nischay StanleyBlack


person Nischay    schedule 17.04.2017    source источник


Ответы (3)


Для условий соединения, которые не включают в себя такие простые равенства, вам потребуется использовать пользовательские функции Spark (UDF).

Вот фрагмент кода JUnit, который не компилируется напрямую, но показывает соответствующий импорт и логику. Однако Java API довольно многословен. Я оставлю задачу сделать это на Scala в качестве упражнения для читателя. Это будет намного лаконичнее.

Статический импорт требуется для методов callUDF() и col().

import static org.apache.spark.sql.functions.*;

import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.api.java.UDF3;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

@Test
public void testSomething() {
    List<Row> data = Arrays.asList(
        RowFactory.create(0, "Allen jeevi pramod Allen"),
        RowFactory.create(1, "sandesh Armstrong jeevi"),
        RowFactory.create(2, "harsha Nischay DeWALT")
    );

    StructType schema = new StructType(new StructField[] {
        new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false, Metadata.empty()) 
    });
    Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

    List<Row> data2 = Arrays.asList(
        RowFactory.create("Allen", "Apex Tool Group"),
        RowFactory.create("Armstrong","Apex Tool Group"),
        RowFactory.create("DeWALT","StanleyBlack")
    );

    StructType schema2 = new StructType(new StructField[] {
        new StructField("label2", DataTypes.StringType, false, Metadata.empty()),
        new StructField("sentence2", DataTypes.StringType, false, Metadata.empty()) 
    });
    Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

    UDF2<String, String, Boolean> contains = new UDF2<String, String, Boolean>() {
        private static final long serialVersionUID = -5239951370238629896L;

        @Override
        public Boolean call(String t1, String t2) throws Exception {
            return t1.contains(t2);
        }
    };
    spark.udf().register("contains", contains, DataTypes.BooleanType);

    UDF3<String, String, String, String> replaceWithTerm = new UDF3<String, String, String, String>() {
        private static final long serialVersionUID = -2882956931420910207L;

        @Override
        public String call(String t1, String t2, String t3) throws Exception {
            return t1.replaceAll(t2, t3);
        }
    };
    spark.udf().register("replaceWithTerm", replaceWithTerm, DataTypes.StringType);

    Dataset<Row> joined = sentenceDataFrame.join(sentenceDataFrame2, callUDF("contains", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2")))
                                           .withColumn("sentence_replaced", callUDF("replaceWithTerm", sentenceDataFrame.col("sentence"), sentenceDataFrame2.col("label2"), sentenceDataFrame2.col("sentence2")))
                                           .select(col("sentence_replaced"));

    joined.show(false);
}

Выход:

+--------------------------------------------+
|sentence_replaced                           |
+--------------------------------------------+
|Apex Tool Group jeevi pramod Apex Tool Group|
|sandesh Apex Tool Group jeevi               |
|harsha Nischay StanleyBlack                 |
+--------------------------------------------+
person Ivan Gozali    schedule 17.04.2017

Мы можем использовать функции replaceAll и UDF для достижения ожидаемого результата.

public class Test {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
        SQLContext sqlContext = new SQLContext(sc);
        SparkSession spark = SparkSession.builder().appName("JavaTokenizerExample").getOrCreate();

        List<Row> data = Arrays.asList(
        RowFactory.create(0, "Allen jeevi pramod Allen"),
        RowFactory.create(1, "sandesh Armstrong jeevi"),
        RowFactory.create(2, "harsha Nischay DeWALT")
    );

        StructType schema = new StructType(new StructField[] {
        new StructField("label", DataTypes.IntegerType, false,
                Metadata.empty()),
        new StructField("sentence", DataTypes.StringType, false,
                Metadata.empty()) });
        Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);
        UDF1 mode = new UDF1<String, String>() {
            public String call(final String types) throws Exception {
                return types.replaceAll("Allen", "Apex Tool Group")
                .replaceAll("Armstrong","Apex Tool Group")
                .replaceAll(""DeWALT","StanleyBlack"")
            }
        };

        sqlContext.udf().register("mode", mode, DataTypes.StringType);

        sentenceDataFrame.createOrReplaceTempView("people");
        Dataset<Row> newDF = sqlContext.sql("SELECT mode(sentence), label FROM people").withColumnRenamed("UDF(sentence)", "sentence");
        newDF.show(false);
}
}

выход

  +--------------------------------------------+------+
  |sentence                                    |label |
  +--------------------------------------------+------+
  |Apex Tool Group jeevi pramod Apex Tool Group|  0   |
  |sandesh Apex Tool Group jeevi               |  1   |
  |harsha Nischay StanleyBlack                 |  2   |
  +--------------------------------------------+------+
person Sandesh Puttaraj    schedule 06.05.2017

До сих пор сталкиваюсь с похожей проблемой

Ввод

Аллен Армстронг jeevi pramod Аллен
Сандеш Армстронг jeevi
харша нишай DEWALT

Вывод

Apex Tool Group Армстронг jeevi pramod Apex Tool Group
Allen Apex Tool Group jeevi pramod Allen
sandesh Apex Tool Group jeevi
суровая ниша StanleyBlack

Ожидаемый результат

Apex Tool Group Apex Tool Group jeevi pramod Apex Tool Group
песочница Apex Tool Group jeevi
грубая ниша StanleyBlack

Получил этот вывод, когда есть несколько замен подряд.

Есть ли какой-либо другой метод, которому необходимо следовать, чтобы получить правильный результат? Или это ограничение UDF?

person Nischay    schedule 27.04.2017