admin管理员组文章数量:1431935
We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions
.
However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions
to 32, it will effectively do a .repartition(32)
on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.
Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?
I was thinking of something like this, but I was hoping there's something more efficient:
// Imagine "df" is the incoming dataframe we want to write.
val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt
val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))
for (chunkNum <- 0 to chunks) {
chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
.drop("CHUNK_NUM")
.write
.format("jdbc")
.options(...) // DB info + numPartitions = 32
.save
}
Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.
I'm also using batchSize
set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.
We want to copy a large Spark dataframe into Oracle, but I am finding the tuning options a bit limited. Looking at Spark documentation, the only related tuning property I could find for a JDBC write is numPartitions
.
However, the dataframe we want to write is 700,000,000 records and Oracle is only 32 cores, so I don't want to overload the database with too many threads. My understanding is if I set numPartitions
to 32, it will effectively do a .repartition(32)
on the large dataset then write each partition into Oracle. 32 partitions in Spark is not enough and will cause memory issues.
Is there a way to break the job into further pieces so it doesn't try to do everything at once, but instead does 50,000,000 (or something) at a time?
I was thinking of something like this, but I was hoping there's something more efficient:
// Imagine "df" is the incoming dataframe we want to write.
val threads = 32
val recordsPerThread = 500000
val chunkSize = threads * recordsPerThread
val total = df.count
val chunks = (total/chunkSize).ceil.toInt
val chunkDf = df.withColumn("CHUNK_NUM", rand.multiply(chunks).cast(IntegerType))
for (chunkNum <- 0 to chunks) {
chunkDf.filter(s"CHUNK_NUM = ${chunkNum}")
.drop("CHUNK_NUM")
.write
.format("jdbc")
.options(...) // DB info + numPartitions = 32
.save
}
Basically, I'm dividing the dataset into "chunks" which can all be written at once with 32 threads (numPartitions). I feel like there should be a more efficient way of doing this, but I can't seem to find it in documentation.
I'm also using batchSize
set to 10000 to reduce round trips, but I'm still limited to how many threads I want making round trips to Oracle, and how large partitions in Spark can be.
2 Answers
Reset to default 1I was overthinking it. We can constrain how much Spark is writing at once by simply constraining the resources we give Spark. If I set numPartitions
to 500 but only give Spark a single 32-core worker, it will only write 32 partitions at a time, limiting how much we're hammering Oracle. Thus effectively "chunks" the job.
In your case i would simply repartition
val threads = 32
df.repartition(threads).write.format("jdbc").options(...).save()
本文标签: scalaBreaking up a large JDBC write with SparkStack Overflow
版权声明:本文标题:scala - Breaking up a large JDBC write with Spark - Stack Overflow 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/web/1745594359a2665384.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论