-
-
Save harshachinta/95a81eeda81c422814353a5995d01e20 to your computer and use it in GitHub Desktop.
inserting and reading large data with PROTO type in spanner using Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Replicating Spanner internal KeyError: 13 caused by merging protos.""" | |
import os | |
import uuid | |
from google.cloud import spanner | |
from google.cloud import spanner_v1 | |
from proto_stream.testdata import wrapper_pb2 | |
INSTANCE_ID = <INSTANCE_ID> | |
DATABASE_ID = <DATABASE_ID> | |
spanner_client = spanner.Client() | |
instance = spanner_client.instance(INSTANCE_ID) | |
def create_database() -> spanner_v1.database.Database: | |
old_db = instance.database(database_id=DATABASE_ID) | |
if old_db.exists(): | |
old_db.drop() | |
print("Creating tables...", end=" ") | |
statements = [ | |
( | |
"CREATE PROTO" | |
" BUNDLE(examples.stream.test.ArrayWrapper)" | |
), | |
( | |
"CREATE TABLE merge_error_array(id STRING(36) NOT NULL, value" | |
" `examples.stream.test.ArrayWrapper` NOT" | |
" NULL, arrayvalue ARRAY<`examples.stream.test.ArrayWrapper`> NOT NULL) PRIMARY KEY(id)" | |
), | |
] | |
dirname = os.path.dirname(__file__) | |
filename = os.path.join(dirname, "testdata/descriptors.pb") | |
proto_descriptor_file = open(filename, "rb") | |
proto_descriptors = proto_descriptor_file.read() | |
operation = instance.database( | |
database_id=DATABASE_ID, | |
ddl_statements=statements, | |
proto_descriptors=proto_descriptors, | |
).create() | |
operation.result(30) # 30s timeout | |
print("Done.") | |
# The operation returns | |
# google.cloud.spanner_admin_database_v1.types.spanner_database_admin.Database, | |
# but we want google.cloud.spanner_v1.database.Database. | |
return instance.database(database_id=DATABASE_ID) | |
def insert_large_row(database: spanner_v1.database.Database) -> None: | |
"""Insert a large row with proto data.""" | |
# Insert a large data for rows to chunk. | |
string_len = 8000 | |
string_count = 100 | |
import string | |
letters = string.ascii_lowercase | |
row_id = str(uuid.uuid4()) | |
value = wrapper_pb2.ArrayWrapper( | |
values=["a" * string_len for _ in range(string_count)] | |
) | |
array_value = [wrapper_pb2.ArrayWrapper(values=[letters[i % len(letters)] * string_len]) for i in range(string_count)] | |
data = { | |
"id": row_id, | |
"value": value, | |
"arrayvalue": array_value | |
} | |
print(f"insert {row_id} with {len(value.values)} values") | |
with database.batch() as batch: | |
batch.insert( | |
table="merge_error_array", | |
columns=data.keys(), | |
values=[data.values()], | |
) | |
def main(): | |
database = create_database() | |
insert_large_row(database) | |
spanner_client = spanner.Client() | |
instance = spanner_client.instance(INSTANCE_ID) | |
database = instance.database(DATABASE_ID) | |
# Read the row back. This is where the error is thrown. | |
with database.snapshot() as snapshot: | |
# Convert to list to make sure results are consumed. | |
results = list(snapshot.execute_sql("SELECT id, arrayvalue FROM merge_error_array", column_info={ | |
"value": wrapper_pb2.ArrayWrapper(), | |
"arrayvalue": wrapper_pb2.ArrayWrapper() | |
})) | |
print(f"Returned {len(results)} results") | |
for row in results: | |
print("id: {}, arrayvalue: {}".format(*row)) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment