Skip to content

Instantly share code, notes, and snippets.

@harshachinta
Created October 17, 2024 06:54
Show Gist options
  • Save harshachinta/95a81eeda81c422814353a5995d01e20 to your computer and use it in GitHub Desktop.
Save harshachinta/95a81eeda81c422814353a5995d01e20 to your computer and use it in GitHub Desktop.
inserting and reading large data with PROTO type in spanner using Python
"""Replicating Spanner internal KeyError: 13 caused by merging protos."""
import os
import uuid
from google.cloud import spanner
from google.cloud import spanner_v1
from proto_stream.testdata import wrapper_pb2
INSTANCE_ID = <INSTANCE_ID>
DATABASE_ID = <DATABASE_ID>
spanner_client = spanner.Client()
instance = spanner_client.instance(INSTANCE_ID)
def create_database() -> spanner_v1.database.Database:
old_db = instance.database(database_id=DATABASE_ID)
if old_db.exists():
old_db.drop()
print("Creating tables...", end=" ")
statements = [
(
"CREATE PROTO"
" BUNDLE(examples.stream.test.ArrayWrapper)"
),
(
"CREATE TABLE merge_error_array(id STRING(36) NOT NULL, value"
" `examples.stream.test.ArrayWrapper` NOT"
" NULL, arrayvalue ARRAY<`examples.stream.test.ArrayWrapper`> NOT NULL) PRIMARY KEY(id)"
),
]
dirname = os.path.dirname(__file__)
filename = os.path.join(dirname, "testdata/descriptors.pb")
proto_descriptor_file = open(filename, "rb")
proto_descriptors = proto_descriptor_file.read()
operation = instance.database(
database_id=DATABASE_ID,
ddl_statements=statements,
proto_descriptors=proto_descriptors,
).create()
operation.result(30) # 30s timeout
print("Done.")
# The operation returns
# google.cloud.spanner_admin_database_v1.types.spanner_database_admin.Database,
# but we want google.cloud.spanner_v1.database.Database.
return instance.database(database_id=DATABASE_ID)
def insert_large_row(database: spanner_v1.database.Database) -> None:
"""Insert a large row with proto data."""
# Insert a large data for rows to chunk.
string_len = 8000
string_count = 100
import string
letters = string.ascii_lowercase
row_id = str(uuid.uuid4())
value = wrapper_pb2.ArrayWrapper(
values=["a" * string_len for _ in range(string_count)]
)
array_value = [wrapper_pb2.ArrayWrapper(values=[letters[i % len(letters)] * string_len]) for i in range(string_count)]
data = {
"id": row_id,
"value": value,
"arrayvalue": array_value
}
print(f"insert {row_id} with {len(value.values)} values")
with database.batch() as batch:
batch.insert(
table="merge_error_array",
columns=data.keys(),
values=[data.values()],
)
def main():
database = create_database()
insert_large_row(database)
spanner_client = spanner.Client()
instance = spanner_client.instance(INSTANCE_ID)
database = instance.database(DATABASE_ID)
# Read the row back. This is where the error is thrown.
with database.snapshot() as snapshot:
# Convert to list to make sure results are consumed.
results = list(snapshot.execute_sql("SELECT id, arrayvalue FROM merge_error_array", column_info={
"value": wrapper_pb2.ArrayWrapper(),
"arrayvalue": wrapper_pb2.ArrayWrapper()
}))
print(f"Returned {len(results)} results")
for row in results:
print("id: {}, arrayvalue: {}".format(*row))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment