The use of ultraso
A woman arrested o
1. Introduction {#
The effect of low
Q:
How do I insta
Differences betwee
Liquid Crystal Dis
Q:
How do I conve
This is a great wa
Q:
How to read thQ:
Pyspark - Using withColumn create a dataframe to perform an udf(function)
I have a udf that gets the values of the day that is not present in the dataset. I would like to apply the function for each value for a specific column to see which values are missing.
I have tried the below code:
dataframe1 = sqlContext.sql('SELECT * FROM input_data')
This returns a DF with many columns. I would like to make one DF to test the if/else function with the day as one of the arguments. Can someone please show me an example how to use this function or can point out my mistake?
This is the code that I tried:
def if_else(arg, cond, then, else_):
if arg == True:
then()
else:
else_()
def test(df):
df.withColumn("test", if_else(df.day =='Sat',df.s_id, "Missing"))
The result should be a new DF with the column names "test", "s_id", "D1","D2", "D3","D4".
If df.day =='Sat' return df.s_id (in the function).
If df.day =='Sat' return Missing (else statement).
I have also tried to add the output to the column day like below but that didn't work either:
dataframe1.withColumn("Day", dataframe1.day.isin(dataframe1["day"].alias("day")))
Any help would be appreciated!
A:
As mentioned in the comments, I don't see any errors with the code, it seems to be working as expected. I think it is just a matter of misunderstanding how to retrieve the data in your test function. The problem is in how you are referencing df. It looks like you are using df outside the function, so it's value is not defined, but the method returns a dataframe. I modified your code to test this, see below:
dataframe1 = sqlContext.sql('SELECT * FROM input_data')
def if_else(df):
df.withColumn("test", if_else(df.day =='Sat',df.s_id, "Missing"))
test(dataframe1)
This returned the following dataframe:
When referencing a DF with the . notation (in this case, dataframe1), it will automatically use df as a reference to the dataframe. That's what is confusing you here.
As a side note, using udf for this should be a much more efficient way to do it, and probably much easier to understand. Using udf you can pass the arguments in using args, you can handle all kinds of columns without much fuss. See below:
dataframe1 = sqlContext.sql('SELECT * FROM input_data')
def if_else(df):
df.withColumn("test", if_else(df.day.isin(["Sat", "Sun"]),df.s_id, "Missing"))
dataframe1.withColumn("test", if_else(dataframe1.columns[0], "missing"))
This is all assuming you need to do this check for a single column. If you need it for all columns, your udf function will probably have to deal with a lot more logic, and you might want to reconsider using a udf or a map operation:
dataframe1.withColumn("test", if_else(lambda x: all(x.isin(["Sat", "Sun"])), df.s_id, "Missing"))
A:
It's working for me:
def test(day_cols):
cols_to_test = ['s_id']
day_cols = ["s_id", "D1", "D2", "D3", "D4"]
df = spark.createDataFrame(
sc.parallelize(
[
(u'D1',u'Sat',u'Sun'),
(u'D2',u'Sat',u'Sun'),
(u'D3',u'Sat',u'Sun'),
(u'D4',u'Sat',u'Sun')
]
),
["D1", "D2", "D3", "D4", "s_id"]
)
return if_else(df.select(*day_cols), "missing")
def if_else(condition, then, otherwise):
if condition:
then()
else:
otherwise()
test(day_cols=['s_id'])
Output:
:26: error: NameError: name 's_id' is not defined
I think if you are using dataframe.column as a name, it is not referring the row. Therefore it is not getting executed.
So try this :
def test(df):
df.withColumn("test", if_else(df.day =='Sat',df.s_id, "Missing"))
test(dataframe1)