Spark
Load CSV to Dataframe
val df = sqlContext.read.
format("com.databricks.spark.csv").
schema(struct).
load("hdfs://itrihd34:8020/tmp/tmp_gohappy_cooc_mf_train");
where struct is pre-defined below.
val struct = StructType(
StructField("ven_session", StringType) ::
StructField("gid", StringType) ::
StructField("page_type", StringType) ::
StructField("categ_code", StringType) ::
StructField("api_logtime", StringType) :: Nil )
Dataframe - add row_id column
val gid_df = df.select($"gid").distinct();
gid_df = gid_df.map(r => r.getString(0)).zipWithUniqueId().toDF("gid", "gIdx");
or
import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
Computer i2i Co-occurrene matrix
val i2i_cooc =
gid2uid_df.join(
gid2uid_df.select($"gid" as "r_gid", $"uid" as "r_uid"),
$"uid" === $"r_uid" and $"gid" < $"r_gid"
).groupBy(
$"gid", $"r_gid"
).agg(
$"gid",
$"r_gid",
countDistinct("uid") as "cnt"
).select(
$"gid", $"r_gid", $"cnt"
)
where gid2uid_df lists as below.
scala> gid2uid_df.show()
+-------+----+
| gid| uid|
+-------+----+
|5342655|1711|
|5342655|1711|
|4017442| 622|
|5266383| 622|
|4318357| 819|
|5301213| 819|
|4571433|3034|
Convert a Column of Dataframe to List
val sessions = df.select("ven_session").rdd.map(r => r(0)).collect()
Where in clause (isin)
df.where( $"gid".isin( List("4497546","4946410"):_* )).show()
where df lists as below
+--------------------+-------+---------+----------+--------------------+
| ven_session| gid|page_type|categ_code| api_logtime|
+--------------------+-------+---------+----------+--------------------+
|7cde1546-49fb-41b...|4288290| gop| 71635|2016-06-01 06:31:...|
|d7cb2692-e8a1-4a3...|5268013| gop| 320737|2016-06-01 06:49:...|
|4207a0f7-2209-4f4...|5110712| gop| 29139|2016-06-01 06:14:...|
[error] ... value $ is not a member of StringContext
Symptom
[error] /root/cooc_mf_als/src/main/scala/AppMF.scala:37: value $ is not a member of StringContext
[error] val uTrans_df = df.select($"ven_session").distinct();
[error] ^
Solution
put import sqlContext.implicits._; after sqlContext creation, i.e. in the most front of any expressions with $.
val sqlContext = new org.apache.spark.sql.SQLContext(sc);
import sqlContext.implicits._;