Spark

Load CSV to Dataframe

val df = sqlContext.read.
         format("com.databricks.spark.csv").
         schema(struct).
         load("hdfs://itrihd34:8020/tmp/tmp_gohappy_cooc_mf_train");

where struct is pre-defined below.

val struct = StructType(
StructField("ven_session", StringType) :: 
StructField("gid", StringType) :: 
StructField("page_type", StringType) :: 
StructField("categ_code", StringType) :: 
StructField("api_logtime", StringType) :: Nil )

Dataframe - add row_id column

zipWithUniqueId

val gid_df = df.select($"gid").distinct();
gid_df = gid_df.map(r => r.getString(0)).zipWithUniqueId().toDF("gid", "gIdx");

or

import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()

see monotonicallyIncreasingId

Computer i2i Co-occurrene matrix

val i2i_cooc = 
  gid2uid_df.join( 
    gid2uid_df.select($"gid" as "r_gid", $"uid" as "r_uid"), 
    $"uid" === $"r_uid" and $"gid" < $"r_gid" 
  ).groupBy(
    $"gid", $"r_gid"
  ).agg(
    $"gid", 
    $"r_gid", 
    countDistinct("uid") as "cnt" 
  ).select(
    $"gid", $"r_gid", $"cnt"
  )

where gid2uid_df lists as below.

scala> gid2uid_df.show()
+-------+----+                                                                  
|    gid| uid|
+-------+----+
|5342655|1711|
|5342655|1711|
|4017442| 622|
|5266383| 622|
|4318357| 819|
|5301213| 819|
|4571433|3034|

Convert a Column of Dataframe to List

val sessions = df.select("ven_session").rdd.map(r => r(0)).collect()

Where in clause (isin)

df.where( $"gid".isin( List("4497546","4946410"):_* )).show()

where df lists as below

+--------------------+-------+---------+----------+--------------------+
|         ven_session|    gid|page_type|categ_code|         api_logtime|
+--------------------+-------+---------+----------+--------------------+
|7cde1546-49fb-41b...|4288290|      gop|     71635|2016-06-01 06:31:...|
|d7cb2692-e8a1-4a3...|5268013|      gop|    320737|2016-06-01 06:49:...|
|4207a0f7-2209-4f4...|5110712|      gop|     29139|2016-06-01 06:14:...|

[error] ... value $ is not a member of StringContext

Symptom
[error] /root/cooc_mf_als/src/main/scala/AppMF.scala:37: value $ is not a member of StringContext
[error]         val uTrans_df = df.select($"ven_session").distinct();
[error]                                   ^
Solution

put import sqlContext.implicits._; after sqlContext creation, i.e. in the most front of any expressions with $.

val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._;

results matching ""

    No results matching ""