Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Joshua1989/dc7e60aa487430ea704a8cb3f2c5d6a6 to your computer and use it in GitHub Desktop.
Save Joshua1989/dc7e60aa487430ea704a8cb3f2c5d6a6 to your computer and use it in GitHub Desktop.
file transferring between Google Colab VM and Google Drive
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
# !pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
import os
import subprocess
from pathlib import Path
__all__ = [
'create_archive',
'extract_archive',
'GoogleDriveHandler'
]
def create_archive(zip_name, local_file_paths, temp_folder='/tmp', verbose=False):
zip_name = '{0}/{1}'.format(temp_folder, zip_name) + '.tar.gz' * ('.tar.gz' not in zip_name)
# Filter out non-existing files and directorys
zipped_files = []
for f in local_file_paths:
if not Path(f).exists():
print('file {0} does not exist, ignore it'.format(f))
else:
zipped_files.append(f)
# Find common prefix to avoid a too many level folders
common_prefix = ''
for chars in zip(*zipped_files):
if len(set(chars)) == 1:
common_prefix += chars[0]
else:
break
common_prefix = '/'.join(common_prefix.split('/')[:-1]) + '/'
# Excuting tar.gz format compression
L = len(common_prefix)
zipped_files = ' '.join([f[L:] for f in zipped_files])
cmd = 'tar -czvf {0} -C {1} {2}'.format(zip_name, common_prefix, zipped_files)
if verbose:
print('ignore the common prefix {0}'.format(common_prefix))
print('running shell command:','\n'+cmd)
result = subprocess.check_output(cmd, shell=True).decode('utf-8')
if verbose: print(result)
# Return absolute path of the tar.gz file
return zip_name
def extract_archive(zip_path, target_folder='./', verbose=False):
cmd = 'tar -xf {0} -C {1}'.format(zip_path, target_folder)
if verbose: print('running shell command:','\n'+cmd)
result = subprocess.check_output(cmd, shell=True).decode('utf-8')
if verbose: print(result)
class GoogleDriveHandler:
def __init__(self):
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
self.drive = GoogleDrive(gauth)
def path_to_id(self, rel_path, parent_folder_id='root'):
rel_path = '/'.join(list(filter(len, rel_path.split('/'))))
if rel_path == '':
return parent_folder_id
else:
first, *rest = list(filter(len, rel_path.split('/')))
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
if first not in file_dict:
raise Exception('{0} not exist'.format(first))
else:
return self.path_to_id('/'.join(rest), file_dict[first]['id'])
def list_folder(self, root_folder_id='root', max_depth=0):
query = "'{0}' in parents and trashed=false".format(root_folder_id)
file_list, folder_type = [], 'application/vnd.google-apps.folder'
for f in self.drive.ListFile({'q': query}).GetList():
if f['mimeType'] == folder_type and max_depth > 0:
file_list.append(
{
'title': f['title'],
'id': f['id'],
'link': f['alternateLink'],
'mimeType': f['mimeType'],
'children': self.list_folder(f['id'], max_depth-1)
}
)
else:
file_list.append(
{
'title':f['title'],
'id': f['id'],
'link':f['alternateLink'],
'mimeType': f['mimeType']
}
)
return file_list
def create_folder(self, folder_name, parent_path=''):
parent_folder_id = self.path_to_id(parent_path)
folder_type = 'application/vnd.google-apps.folder'
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
if folder_name not in file_dict:
folder_metadata = {
'title' : folder_name,
'mimeType' : folder_type,
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}]
}
folder = self.drive.CreateFile(folder_metadata)
folder.Upload()
return folder['id']
else:
if file_dict[folder_name]['mimeType'] != folder_type:
raise Exception('{0} already exists as a file'.format(folder_name))
else:
print('{0} already exists'.format(folder_name))
return file_dict[folder_name]['id']
def upload(self, local_file_path, parent_path='', overwrite=True):
parent_folder_id = self.path_to_id(parent_path)
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
file_name = local_file_path.split('/')[-1]
if file_name in file_dict and overwrite:
file_dict[file_name].Delete()
file = self.drive.CreateFile(
{
'title': file_name,
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}]
}
)
file.SetContentFile(local_file_path)
file.Upload()
return file['id']
def download(self, local_file_path, target_path):
target_id = self.path_to_id(target_path)
file = self.drive.CreateFile({'id': target_id})
file.GetContentFile(local_file_path)

Run these codes first in order to install the necessary libraries and perform authorization.

!apt-get install -y -qq software-properties-common python-software-properties module-init-tools
!add-apt-repository -y ppa:alessandro-strada/ppa 2>&1 > /dev/null
!apt-get update -qq 2>&1 > /dev/null
!apt-get -y install -qq google-drive-ocamlfuse fuse
from google.colab import auth
auth.authenticate_user()
from oauth2client.client import GoogleCredentials
creds = GoogleCredentials.get_application_default()
import getpass
!google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret} < /dev/null 2>&1 | grep URL
vcode = getpass.getpass()
!echo {vcode} | google-drive-ocamlfuse -headless -id={creds.client_id} -secret={creds.client_secret}

Click the link, copy verification code and paste it to text box. After completion of the authorization process,

mount your Google Drive:

!mkdir -p drive
!google-drive-ocamlfuse drive

There are several approaches

  • Mount Google Drive in local Colab VM
  • Upload and download via browser
  • Use colab_util.py in python script
from google.colab import files
# Upload local files to Colab VM
uploaded = files.upload()
# Download Colab VM fiels to local
files.download('target_file_name')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment