Source code for ibllib.io.globus

import re
import sys
import os
from pathlib import Path

import globus_sdk as globus
from iblutil.io import params


[docs]def as_globus_path(path): """ Convert a path into one suitable for the Globus TransferClient. NB: If using tilda in path, the home folder of your Globus Connect instance must be the same as the OS home dir. :param path: A path str or Path instance :return: A formatted path string Examples: # A Windows path >>> as_globus_path('E:\\FlatIron\\integration') >>> '/E/FlatIron/integration' # A relative POSIX path >>> as_globus_path('../data/integration') >>> '/mnt/data/integration' # A globus path >>> as_globus_path('/E/FlatIron/integration') >>> '/E/FlatIron/integration' """ path = str(path) if ( re.match(r'/[A-Z]($|/)', path) if sys.platform in ('win32', 'cygwin') else Path(path).is_absolute() ): return path path = Path(path).resolve() if path.drive: path = '/' + str(path.as_posix().replace(':', '', 1)) return str(path)
def _login(globus_client_id, refresh_tokens=False): client = globus.NativeAppAuthClient(globus_client_id) client.oauth2_start_flow(refresh_tokens=refresh_tokens) authorize_url = client.oauth2_get_authorize_url() print('Please go to this URL and login: {0}'.format(authorize_url)) auth_code = input( 'Please enter the code you get after login here: ').strip() token_response = client.oauth2_exchange_code_for_tokens(auth_code) globus_transfer_data = token_response.by_resource_server['transfer.api.globus.org'] token = dict(refresh_token=globus_transfer_data['refresh_token'], access_token=globus_transfer_data['access_token'], expires_at_seconds=globus_transfer_data['expires_at_seconds'], ) return token
[docs]def login(globus_client_id): token = _login(globus_client_id, refresh_tokens=False) authorizer = globus.AccessTokenAuthorizer(token['access_token']) tc = globus.TransferClient(authorizer=authorizer) return tc
[docs]def setup(globus_client_id, str_app='globus/default'): # Lookup and manage consents there # https://auth.globus.org/v2/web/consents gtok = _login(globus_client_id, refresh_tokens=True) params.write(str_app, gtok)
[docs]def login_auto(globus_client_id, str_app='globus/default'): token = params.read(str_app, {}) required_fields = {'refresh_token', 'access_token', 'expires_at_seconds'} if not (token and required_fields.issubset(token.as_dict())): raise ValueError("Token file doesn't exist, run ibllib.io.globus.setup first") client = globus.NativeAppAuthClient(globus_client_id) client.oauth2_start_flow(refresh_tokens=True) authorizer = globus.RefreshTokenAuthorizer(token.refresh_token, client) return globus.TransferClient(authorizer=authorizer)
[docs]def get_local_endpoint(): if sys.platform == 'win32' or sys.platform == 'cygwin': id_path = Path(os.environ['LOCALAPPDATA']).joinpath("Globus Connect") else: id_path = Path.home().joinpath(".globusonline", "lta") with open(id_path / "client-id.txt", 'r') as fid: globus_id = fid.read() return globus_id.strip()