admin管理员组文章数量:1432178
I am trying to retrieve data from a table in HDFS in which a column contains timestamps.
I am connected in hdfs using CDSW and running a python script which opens a spark session and run an sql query to retrieve some rows from the table. Although running the same sql query in HUE imala i get the proper values, in CDSW using the python script i get None values only from the timestamp column. How can i retrieve my data properly. It's a huge table so i cannot just export the csv file from the impala editor. I want to retrieve data for migration to another database. The script i run in CDSW is the following:
where measurement_time
returns None
values instead of timestamps that appear in HUE Impala editor.
import pandas as pd
import numpy as np
import sys
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql.functions import lit
from pyspark.sql.functions import unix_timestamp
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_utc_timestamp
from pyspark.sql import SQLContext
os.environ['PYSPARK_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
os.environ['PROJ_LIB']='/home/cdsw/.conda/envs/python3.6/share/proj'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/cloudera/parcels/Anaconda-5.1.0.1/bin/python3.6'
spark = SparkSession.builder\
.master("yarn")\
.config("spark.sql.session.timeZone","UTC")\
.config("spark.submit.deployMode", "client")\
.config("spark.eventLog.enabled", "true")\
.config("spark.executor.instances", "30")\
.config("spark.executor.cores", "2")\
.config("spark.executor.memory", "4g")\
.config("spark.rpc.message.maxSize", "1024")\
.config("spark.executor.memoryOverhead", "800")\
.config("spark.driver.memory", "4g")\
.config("spark.driver.memoryOverhead", "800")\
.config("spark.spark.driver.maxResultSize", "4g")\
.config("spark.executor.dynamicAllocation.initialExecutors", "false")\
.config("spark.executor.dynamicAllocation.minExecutors", "false")\
.config("spark.executor.dynamicAllocation.maxExecutors", "false")\
.config("spark.sql.broadcastTimeout", "1000")\
.config("spark.kryoserializer.buffer.max", "1024m")\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.avro.datetimeRebaseModeInRead", "LEGACY") \
.config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition = true')
hiveaction = sqlContext.sql('SET hive.exec.dynamic.partition.mode = nonstrict')
pd.set_option("display.max_rows", None, "display.max_columns", None)
qry ="""SELECT parameter_id, measurement_time , value, par_dt FROM aums.eems_archive_data WHERE par_dt = '20240101' LIMIT 10"""
spark_df = spark.sql(qry)
data_df = spark_df.toPandas()
print(data_df.head(1))
I am expecting:
parameter_id measurement_time value par_dt \
0 d7cc8e82-7ad1-11ed-aaf9-fa163ed4a1d0 2024-01-01 01:34:24 13.45 20240101
and i get:
parameter_id measurement_time value par_dt \
0 d7cc8e82-7ad1-11ed-aaf9-fa163ed4a1d0 None 13.45 20240101
本文标签: pysparkData Retrieval from HDFSStack Overflow
版权声明:本文标题:pyspark - Data Retrieval from HDFS - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745574574a2664250.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论