Skip to content

Instantly share code, notes, and snippets.

@RustedBones
Created October 24, 2023 08:21
Show Gist options
  • Save RustedBones/faabfd0b79d05d39a069114e9a10fd1e to your computer and use it in GitHub Desktop.
Save RustedBones/faabfd0b79d05d39a069114e9a10fd1e to your computer and use it in GitHub Desktop.
BQ ReadSession selectedFields on null record coalesced to 0 value
package com.example
import com.google.api.gax.core.FixedCredentialsProvider
import com.google.api.gax.rpc.FixedHeaderProvider
import com.google.cloud.bigquery.storage.v1.{BigQueryReadClient, BigQueryReadSettings, CreateReadSessionRequest, DataFormat, ReadRowsRequest, ReadSession}
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import scala.jdk.CollectionConverters._
class BigqueryClientTestIT extends AnyFlatSpec with Matchers {
val projectId = ???
val datasetId = ???
val tableId = ???
val fields = Seq("required", "optional.int")
"BigqueryClient" should "export properly" in {
val gcpCredentials = ???
val header = FixedHeaderProvider.create("user-agent", "it-test")
val settingsBuilder = BigQueryReadSettings.newBuilder
.setCredentialsProvider(FixedCredentialsProvider.create(gcpCredentials))
.setTransportChannelProvider(
BigQueryReadSettings.defaultGrpcTransportProviderBuilder
.setHeaderProvider(header)
.build
)
val createReadSessionSettings = settingsBuilder.getStubSettingsBuilder.createReadSessionSettings
createReadSessionSettings.setRetrySettings(
createReadSessionSettings.getRetrySettings.toBuilder
.setInitialRpcTimeout(org.threeten.bp.Duration.ofHours(2))
.setMaxRpcTimeout(org.threeten.bp.Duration.ofHours(2))
.setTotalTimeout(org.threeten.bp.Duration.ofHours(2))
.build
)
val splitReadStreamSettings = settingsBuilder.getStubSettingsBuilder.splitReadStreamSettings
splitReadStreamSettings.setRetrySettings(
splitReadStreamSettings.getRetrySettings.toBuilder
.setInitialRpcTimeout(org.threeten.bp.Duration.ofSeconds(30))
.setMaxRpcTimeout(org.threeten.bp.Duration.ofSeconds(30))
.setTotalTimeout(org.threeten.bp.Duration.ofSeconds(30))
.build
)
val client = BigQueryReadClient.create(settingsBuilder.build)
try {
val readSessionBuilder = ReadSession.newBuilder()
readSessionBuilder.setTable(s"projects/$projectId/datasets/$datasetId/tables/$tableId")
val tableReadOptionsBuilder = ReadSession.TableReadOptions.newBuilder()
tableReadOptionsBuilder.addAllSelectedFields(fields.asJava)
readSessionBuilder.setReadOptions(tableReadOptionsBuilder)
readSessionBuilder.setDataFormat(DataFormat.AVRO)
val createReadSessionRequest = CreateReadSessionRequest
.newBuilder()
.setParent(s"projects/$projectId")
.setReadSession(readSessionBuilder)
// .setMaxStreamCount(streamCount)
.build()
val readSession = client.createReadSession(createReadSessionRequest)
val avroSchema = new Schema.Parser().parse(readSession.getAvroSchema.getSchema)
println(avroSchema.toString(true))
val reader = new GenericDatumReader[GenericRecord](avroSchema)
readSession.getStreamsList.asScala
.flatMap { s =>
val request = ReadRowsRequest.newBuilder
.setReadStream(s.getName)
.build()
client.readRowsCallable().call(request).iterator().asScala
}
.flatMap { readRowsResponse =>
val avroRows = readRowsResponse.getAvroRows
val decoder = DecoderFactory.get
.binaryDecoder(avroRows.getSerializedBinaryRows.toByteArray, null)
(0L until readRowsResponse.getRowCount).map(_ => reader.read(null, decoder))
}
.foreach(println)
} finally {
client.close()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment