hops package

Submodules

hops.devices module

Utility functions to retrieve information about available devices in the environment.

hops.devices.get_num_gpus()

Get the number of GPUs available in the environment and consequently by the application Assuming there is one GPU in the environment

>>> from hops import devices
>>> devices.get_num_gpus()
>>> 1
Returns:
Number of GPUs available in the environment

hops.experiment module

Experiment module used for running Experiments, Parallel Experiments (hyperparameter optimization) and Distributed Training on Hopsworks.

The programming model is that you wrap the code to run inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment, see examples below. Whenever a function to run an experiment is invoked it is also registered in the Experiments service.

Three different types of experiments
  • Run a single standalone Experiment using the launch function.
  • Run Parallel Experiments performing hyperparameter optimization using grid_search or differential_evolution.
  • Run single or multi-machine Distributed Training using parameter_server, collective_all_reduce or mirrored.
class hops.experiment.Direction
MAX = 'MAX'
MIN = 'MIN'
hops.experiment.collective_all_reduce(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Sets up the cluster to run CollectiveAllReduceStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...CollectiveAllReduceStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.collective_all_reduce(distributed_training, local_logdir=True)
Args:
map_fun:the function containing code to run CollectiveAllReduceStrategy
name:the name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:a longer description for the experiment
evaluator:whether to run one of the workers as an evaluator
Returns:
HDFS path in your project where the experiment is stored and return value from the process running as chief
hops.experiment.differential_evolution(objective_function, boundary_dict, direction='MAX', generations=4, population=6, mutation=0.5, crossover=0.7, name='no-name', local_logdir=False, description=None, optimization_key='metric')

Parallel Experiment

Run differential evolution to explore a given search space for each hyperparameter and figure out the best hyperparameter combination. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    import tensorflow
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction=Direction.MAX)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.differential_evolution(train_nn, boundary_dict, direction=Direction.MAX, optimization_key='accuracy')
Args:
objective_function:
 the function to run, must return a metric
boundary_dict:a dict where each key corresponds to an argument of objective_function and the correspond value should be a list of two elements. The first element being the lower bound for the parameter and the the second element the upper bound.
direction:Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric
generations:number of generations
population:size of population
mutation:mutation rate to explore more different hyperparameters
crossover:how fast to adapt the population to the best in each generation
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:a longer description for the experiment
optimization_key:
 When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value
Returns:
HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics

Parallel Experiment

Run grid search evolution to explore a predefined set of hyperparameter combinations. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> grid_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.grid_search(train_nn, grid_dict, direction=Direction.MAX)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> grid_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.grid_search(train_nn, grid_dict, direction=Direction.MAX, optimization_key='accuracy')
Args:
map_fun:the function to run, must return a metric
grid_dict:a dict with a key for each argument with a corresponding value being a list containing the hyperparameters to test, internally all possible combinations will be generated and run as separate Experiments
direction:Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:a longer description for the experiment
optimization_key:
 When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value
Returns:
HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics
hops.experiment.launch(map_fun, args_dict=None, name='no-name', local_logdir=False, description=None, metric_key=None)

Experiment or Parallel Experiment

Run an Experiment contained in map_fun one time with no arguments or multiple times with different arguments if args_dict is specified.

Example usage:

>>> from hops import experiment
>>> def train_nn():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>> experiment.launch(train_nn)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> def train_nn():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> experiment.launch(train_nn)
Args:
map_fun:The function to run
args_dict:If specified will run the same function multiple times with different arguments, {‘a’:[1,2], ‘b’:[5,3]} would run the function two times with arguments (1,5) and (2,3) provided that the function signature contains two arguments like def func(a,b):
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:A longer description for the experiment
metric_key:If returning a dict with multiple return values, this key should match the name of the key in the dict for the metric you want to associate with the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.mirrored(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Example usage:

>>> from hops import experiment
>>> def mirrored_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...MirroredStrategy()...
>>> experiment.mirrored(mirrored_training, local_logdir=True)
Args:
map_fun:contains the code where you are using MirroredStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:a longer description for the experiment
evaluator:whether to run one of the workers as an evaluator
Returns:
HDFS path in your project where the experiment is stored and return value from the process running as chief
hops.experiment.parameter_server(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Sets up the cluster to run ParameterServerStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...ParameterServerStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.parameter_server(distributed_training, local_logdir=True)
Args:f
map_fun:contains the code where you are using ParameterServerStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:a longer description for the experiment
evaluator:whether to run one of the workers as an evaluator
Returns:
HDFS path in your project where the experiment is stored and return value from the process running as chief

Parallel Experiment

Run an Experiment contained in map_fun for configured number of random samples controlled by the samples parameter. Each hyperparameter is contained in boundary_dict with the key corresponding to the name of the hyperparameter and a list containing two elements defining the lower and upper bound. The experiment must return a metric corresponding to how ‘good’ the given hyperparameter combination is.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max')

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max', optimization_key='accuracy')
Args:
map_fun:The function to run
boundary_dict:dict containing hyperparameter name and corresponding boundaries, each experiment randomize a value in the boundary range.
direction:Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric
samples:the number of random samples to evaluate for each hyperparameter given the boundaries, for example samples=3 would result in 3 hyperparameter combinations in total to evaluate
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
description:A longer description for the experiment
optimization_key:
 When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value
Returns:
HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics

hops.model module

Model module is used for exporting, versioning, attaching metadata to models. In pipelines it should be used to query the Model Repository for the best model version to use during inference.

class hops.model.Metric
MAX = 'MAX'
MIN = 'MIN'
exception hops.model.ModelNotFound

Bases: exceptions.Exception

This exception will be raised if the requested model could not be found

hops.model.export(model_path, model_name, model_version=None, overwrite=False, metrics=None, description=None, synchronous=True, synchronous_timeout=120)

Copies a trained model to the Models directory in the project and creates the directory structure of:

>>> Models
>>>      |
>>>      - model_name
>>>                 |
>>>                 - version_x
>>>                 |
>>>                 - version_y

For example if you run this:

>>> from hops import model
>>> model.export("iris_knn.pkl", "irisFlowerClassifier", metrics={'accuracy': accuracy})

It will copy the local model file “iris_knn.pkl” to /Projects/projectname/Models/irisFlowerClassifier/1/iris.knn.pkl on HDFS, and overwrite in case there already exists a file with the same name in the directory.

If “model” is a directory on the local path exported by TensorFlow, and you run:

>>> model.export("/model", "mnist", metrics={'accuracy': accuracy, 'loss': loss})

It will copy the model directory contents to /Projects/projectname/Models/mnist/1/ , e.g the “model.pb” file and the “variables” directory.

Args:
model_path:path to the trained model (HDFS or local)
model_name:name of the model
model_version:version of the model
overwrite:boolean flag whether to overwrite in case a model already exists in the exported directory
metrics:dict of evaluation metrics to attach to model
description:description about the model
synchronous:whether to synchronously wait for the model to be indexed in the models rest endpoint
synchronous_timeout:
 max timeout in seconds for waiting for the model to be indexed
Returns:
The path to where the model was exported
Raises:
ValueError:if there was an error with th of the model due to invalid user input
ModelNotFound:if the model was not found
hops.model.get_best_model(name, metric, direction)

Get the best model version by sorting on attached metadata such as model accuracy.

For example if you run this:

>>> from hops import model
>>> from hops.model import Metric
>>> model.get_best_model('mnist', 'accuracy', Metric.MAX)

It will return the mnist version where the ‘accuracy’ is the highest.

Args:
name:name of the model
metric:name of the metric to compare
direction:whether metric should be maximized or minimized to find the best model
Returns:
The best model
Raises:
ModelNotFound:if the model was not found
hops.model.get_model(name, version)

Get a specific model version given a model name and a version.

For example if you run this:

>>> from hops import model
>>> model.get_model('mnist', 1)

You will get version 1 of the model ‘mnist’

Args:
name:name of the model
version:version of the model
Returns:
The specified model version
Raises:
ModelNotFound:if the model was not found

hops.numpy_helper module

API for reading/writing numpy arrays to/from HDFS

hops.numpy_helper.fromfile(hdfs_filename, **kwds)

Construct an array from data in a text or binary file.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.loadtxt(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.fromregex(hdfs_filename, **kwds)

Construct an array from a text file, using regular expression parsing.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.loadtxt(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.genfromtxt(hdfs_filename, **kwds)

Load data from a HDFS text file, with missing values handled as specified. Each line past the first skip_header lines is split at the delimiter character, and characters following the comments character are discarded.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.loadtxt(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.load(hdfs_filename, **kwds)

Reads a file from HDFS into a Numpy Array

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.load(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.loadtxt(hdfs_filename, **kwds)

Load data from a text file in HDFS into a Numpy Array. Each row in the text file must have the same number of values.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.loadtxt(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.memmap(hdfs_filename, **kwds)

Create a memory-map to an array stored in a binary file on disk.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.loadtxt(…)
Returns:
A memmap with dtype and shape that matches the data.
Raises:
IOError: If the file does not exist
hops.numpy_helper.save(hdfs_filename, data)

Saves a numpy array to a file in HDFS

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
data:numpy array
Raises:
IOError: If the local file does not exist
hops.numpy_helper.savez(hdfs_filename, *args, **kwds)

Save several arrays into a single file in uncompressed .npz format in HDFS If arguments are passed in with no keywords, the corresponding variable names, in the .npz file, are ‘arr_0’, ‘arr_1’, etc. If keyword arguments are given, the corresponding variable names, in the .npz file will match the keyword names.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
args:Arguments, optional Arrays to save to the file. Since it is not possible for Python to know the names of the arrays outside savez, the arrays will be saved with names ‘arr_0’, ‘arr_1’, and so on. These arguments can be any expression.
kwds:Keyword arguments, optional Arrays to save to the file. Arrays will be saved in the file with the keyword names. :data: numpy array

Returns: None

Raises:
IOError: If the local file does not exist
hops.numpy_helper.savez_compressed(hdfs_filename, *args, **kwds)

Save several arrays into a single file in uncompressed .npz format in HDFS If arguments are passed in with no keywords, the corresponding variable names, in the .npz file, are ‘arr_0’, ‘arr_1’, etc. If keyword arguments are given, the corresponding variable names, in the .npz file will match the keyword names.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
*args:Arguments, optional Arrays to save to the file. Since it is not possible for Python to know the names of the arrays outside savez, the arrays will be saved with names ‘arr_0’, ‘arr_1’, and so on. These arguments can be any expression.
**kwds:Keyword arguments, optional Arrays to save to the file. Arrays will be saved in the file with the keyword names. :data: numpy array

Returns: None

Raises:
IOError: If the local file does not exist

hops.pandas_helper module

API for opening csv files into Pandas from HDFS

hops.pandas_helper.read_csv(hdfs_filename, **kwds)

Reads a comma-separated values (csv) file from HDFS into a Pandas DataFrame

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
**kwds:You can add any additional args found in pandas.read_csv(…)
Returns:
A pandas dataframe
Raises:
IOError: If the file does not exist
hops.pandas_helper.read_excel(hdfs_filename, **kwds)

Retrieve pandas object stored in HDFS file, optionally based on where criteria

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
**kwds:You can add any additional args found in pandas.read_csv(…)
Returns:
A pandas dataframe
Raises:
IOError: If the file does not exist
hops.pandas_helper.read_json(hdfs_filename, **kwds)

Convert a JSON string to pandas object.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
**kwds:You can add any additional args found in pandas.read_csv(…)
Returns:
A pandas dataframe
Raises:
IOError: If the file does not exist
hops.pandas_helper.read_parquet(hdfs_filename, **kwds)

Load a parquet object from a HDFS path, returning a DataFrame.

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
**kwds:You can add any additional args found in pandas.read_csv(…)
Returns:
A pandas dataframe
Raises:
IOError: If the file does not exist
hops.pandas_helper.write_csv(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a comma-separated values (csv) text file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
dataframe:a Pandas dataframe
**kwds:You can add any additional args found in pandas.to_csv(…)
Raises:
IOError: If the file does not exist
hops.pandas_helper.write_json(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a JSON file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
dataframe:a Pandas dataframe
**kwds:You can add any additional args found in pandas.to_json(…)
Raises:
IOError: If the file does not exist
hops.pandas_helper.write_parquet(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a parquet file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
dataframe:a Pandas dataframe
**kwds:You can add any additional args found in pandas.to_parequet(…)
Raises:
IOError: If the file does not exist

hops.featurestore module

hops.hdfs module

API for interacting with the file system on Hops (HopsFS).

It is a wrapper around pydoop together with utility functions that are Hops-specific.

hops.hdfs.abs_path(hdfs_path)

Return an absolute path for hdfs_path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
Return an absolute path for hdfs_path.
hops.hdfs.access(hdfs_path, mode, project=None)

Perform the equivalent of os.access() on path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if access is allowed, False if not.
hops.hdfs.add_module(hdfs_path, project=None)

Add a .py or .ipynb file from HDFS to sys.path

For example, if you execute:

>>> add_module("Resources/my_module.py")
>>> add_module("Resources/my_notebook.ipynb")

You can import it simply as:

>>> import my_module
>>> import my_notebook
Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS) to a .py or .ipynb file
Returns:
Return full local path to localized python file or converted python file in case of .ipynb file
hops.hdfs.capacity()

Returns the raw capacity of the filesystem

Returns:
filesystem capacity (int)
hops.hdfs.chmod(hdfs_path, mode, project=None)

Change file mode bits.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.chown(hdfs_path, user, group, project=None)

Change file owner and group.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to the given project path in HDFS).
user:New hdfs username
group:New hdfs group
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.close()

Closes an the HDFS connection (disconnects to the namenode)

hops.hdfs.copy_to_hdfs(local_path, relative_hdfs_path, overwrite=False, project=None)

Copies a path from local filesystem to HDFS project (recursively) using relative path in $CWD to a path in hdfs (hdfs_path)

For example, if you execute:

>>> copy_to_hdfs("data.tfrecords", "/Resources", project="demo")

This will copy the file data.tfrecords to hdfs://Projects/demo/Resources/data.tfrecords

Args:
local_path:Absolute or local path on the local filesystem to copy
relative_hdfs_path:
 a path in HDFS relative to the project root to where the local path should be written
overwrite:a boolean flag whether to overwrite if the path already exists in HDFS
project:name of the project, defaults to the current HDFS user’s project
hops.hdfs.copy_to_local(hdfs_path, local_path='', overwrite=False, project=None)

Copies a directory or file from a HDFS project to a local private scratch directory. If there is not enough space on the local scratch directory, an exception is thrown. If the local file exists, and the hdfs file and the local file are the same size in bytes, return ‘ok’ immediately. If the local directory tree exists, and the hdfs subdirectory and the local subdirectory have the same files and directories, return ‘ok’ immediately.

For example, if you execute:

>>> copy_to_local("Resources/my_data")

This will copy the directory my_data from the Resources dataset in your project to the current working directory on the path ./my_data

Raises:
IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)
Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
local_path:the relative or full path to a directory on the local filesystem to copy to (relative to a scratch directory $PDIR), defaults to $CWD
overwrite:a boolean flag whether to overwrite if the path already exists in the local scratch directory.
project:name of the project, defaults to the current HDFS user’s project
Returns:
the full local pathname of the file/dir
hops.hdfs.cp(src_hdfs_path, dest_hdfs_path, overwrite=False)

Copy the contents of src_hdfs_path to dest_hdfs_path.

If src_hdfs_path is a directory, its contents will be copied recursively. Source file(s) are opened for reading and copies are opened for writing.

Args:
src_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
overwrite:boolean flag whether to overwrite destination path or not.
hops.hdfs.delete(hdfs_path, recursive=False)

Deletes path, path can be absolute or relative. If recursive is set to True and path is a directory, then files will be deleted recursively.

For example

>>> delete("/Resources/", recursive=True)

will delete all files recursively in the folder “Resources” inside the current project.

Args:
hdfs_path:the path to delete (project-relative or absolute)
Returns:
None
Raises:
IOError when recursive is False and directory is non-empty
hops.hdfs.dump(data, hdfs_path)

Dumps data to a file

Args:
data:data to write to hdfs_path
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.exists(hdfs_path, project=None)

Return True if hdfs_path exists in the default HDFS.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if hdfs_path exists.

Raises: IOError

hops.hdfs.get()

Get a handle to pydoop hdfs using the default namenode (specified in hadoop config)

Returns:
Pydoop hdfs handle
hops.hdfs.get_fs()

Get a handle to pydoop fs using the default namenode (specified in hadoop config)

Returns:
Pydoop fs handle
hops.hdfs.get_plain_path(abs_path)

Convert absolute HDFS/HOPSFS path to plain path (dropping prefix and ip)

Example use-case:

>>> hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
>>> # returns: "/Projects/demo_deep_learning_admin000/Models/"
Args:
abs_path:the absolute HDFS/hopsfs path containing prefix and/or ip
Returns:
the plain path without prefix and ip
hops.hdfs.get_webhdfs_host()

Reads the ipc.server.ssl.enabled property from core-site.xml and returns webhdfs hostname.

Returns:
returns the webhdfs hostname.
hops.hdfs.get_webhdfs_port()

Reads the ipc.server.ssl.enabled property from core-site.xml and the webhdfs port.

Returns:
returns the webhdfs port.
hops.hdfs.glob(hdfs_path, recursive=False, project=None)

Finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order.

Globbing gives you the list of files in a dir that matches a supplied pattern

>>> glob('Resources/*.json')
>>> ['Resources/1.json', 'Resources/2.json']

glob is implemented as os.listdir() and fnmatch.fnmatch() We implement glob as hdfs.ls() and fnmatch.filter()

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Raises:
IOError if the supplied hdfs path does not exist
Returns:
A possibly-empty list of path names that match pathname, which must be a string containing a path specification. pathname can be either absolute
hops.hdfs.is_tls_enabled()

Reads the ipc.server.ssl.enabled property from core-site.xml.

Returns:
returns True if ipc.server.ssl.enabled is true. False otherwise.
hops.hdfs.isdir(hdfs_path, project=None)

Return True if path refers to a directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a file.

Raises: IOError

hops.hdfs.isfile(hdfs_path, project=None)

Return True if path refers to a file.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a file.

Raises: IOError

hops.hdfs.load(hdfs_path)

Read the content of hdfs_path and return it.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
the read contents of hdfs_path
hops.hdfs.ls(hdfs_path, recursive=False, exclude_nn_addr=False)

lists a directory in HDFS

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths
hops.hdfs.lsl(hdfs_path, recursive=False, project=None)

Returns all the pathnames in the supplied directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
recursive:if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Returns:
A possibly-empty list of path names stored in the supplied path.
hops.hdfs.mkdir(hdfs_path, project=None)

Create a directory and its parents as needed.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.move(src, dest)

Move or rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.open_file(hdfs_path, project=None, flags='rw', buff_size=0)

Opens an HDFS file for read/write/append and returns a file descriptor object (fd) that should be closed when no longer needed.

Args:
hdfs_path: you can specify either a full hdfs pathname or a relative one (relative to your project’s path in HDFS) flags: supported opening modes are ‘r’, ‘w’, ‘a’. In addition, a trailing ‘t’ can be added to specify text mode (e.g, ‘rt’ = open for reading text) buff_size: Pass 0 as buff_size if you want to use the “configured” values, i.e the ones set in the Hadoop configuration files.
Returns:
A file descriptor (fd) that needs to be closed (fd-close()) when it is no longer needed.
Raises:
IOError: If the file does not exist.
hops.hdfs.project_id()

Get the Hopsworks project id from environment variables

Returns: the Hopsworks project id

hops.hdfs.project_name()

Extracts the project name from the project username (“project__user”) or from the environment if available

Returns:
project name
hops.hdfs.project_path(project=None, exclude_nn_addr=False)

Get the path in HopsFS where the HopsWorks project is located. To point to a particular dataset, this path should be appended with the name of your dataset.

>>> from hops import hdfs
>>> project_path = hdfs.project_path()
>>> print("Project path: {}".format(project_path))
Args:
project_name:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
returns the project absolute path
hops.hdfs.project_user()

Gets the project username (“project__user”) from environment variables

Returns:
the project username
hops.hdfs.rename(src, dest)

Rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.rmr(hdfs_path, project=None)

Recursively remove files and directories.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.stat(hdfs_path)

Performs the equivalent of os.stat() on hdfs_path, returning a StatResult object.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths

hops.kafka module

A module for setting up Kafka Brokers and Consumers on the Hops platform. It hides the complexity of configuring Kafka by providing utility methods such as:

  • get_broker_endpoints().
  • get_security_protocol().
  • get_kafka_default_config().
  • etc.

Using these utility functions you can setup Kafka with the Kafka client-library of your choice, e.g SparkStreaming or confluent-kafka-python. For example, assuming that you have created a topic called “test” on Hopsworks and that you have installed confluent-kafka-python inside your project’s anaconda environment:

>>> from hops import kafka
>>> from confluent_kafka import Producer, Consumer
>>> TOPIC_NAME = "test"
>>> config = kafka.get_kafka_default_config()
>>> producer = Producer(config)
>>> consumer = Consumer(config)
>>> consumer.subscribe(["test"])
>>> # wait a little while before executing the rest of the code (put it in a different Jupyter cell)
>>> # so that the consumer get chance to subscribe (asynchronous call)
>>> for i in range(0, 10):
>>> producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
>>> # Trigger the sending of all messages to the brokers, 10sec timeout
>>> producer.flush(10)
>>> for i in range(0, 10):
>>> msg = consumer.poll(timeout=5.0)
>>> if msg is not None:
>>>     print('Consumed Message: {} from topic: {}'.format(msg.value(), msg.topic()))
>>> else:
>>>     print("Topic empty, timeout when trying to consume message")

Similarly, you can define a pyspark kafka consumer as follows, using the spark session defined in variable spark

>>> from hops import kafka
>>> from hops import tls
>>> TOPIC_NAME = "test"
>>> df = spark \.format("kafka")
>>> .option("kafka.bootstrap.servers", kafka.get_broker_endpoints())
>>> .option("kafka.ssl.truststore.location", tls.get_trust_store())
>>> .option("kafka.ssl.truststore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.keystore.location", tls.get_key_store())
>>> .option("kafka.ssl.keystore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.key.password", tls.get_trust_store_pwd())
>>> .option("subscribe", TOPIC_NAME)
>>> .load()
class hops.kafka.KafkaTopicDTO(kafka_topic_dto_json)

Bases: object

Represents a KafkaTopic in Hopsworks

__init__(kafka_topic_dto_json)

Initialize the kafka topic from JSON payload returned by Hopsworks REST API

Args:
kafka_topic_dto_json:
 JSON data about the kafka topic returned from Hopsworks REST API
hops.kafka.convert_json_schema_to_avro(json_schema)

Parses a JSON kafka topic schema returned by Hopsworks REST API into an avro schema

Args:
json_schema:the json schema to convert
Returns:
the avro schema
hops.kafka.get_broker_endpoints()

Get Kafka broker endpoints as a string with broker-endpoints “,” separated

Returns:
a string with broker endpoints comma-separated
hops.kafka.get_broker_endpoints_list()

Get Kafka broker endpoints as a list

Returns:
a list with broker endpoint strings
hops.kafka.get_kafka_default_config()

Gets a default configuration for running secure Kafka on Hops

Returns:
dict with config_property –> value
hops.kafka.get_schema(topic)

Gets the Avro schema for a particular Kafka topic.

Args:
topic:Kafka topic name
Returns:
Avro schema as a string object in JSON format
hops.kafka.get_security_protocol()

Gets the security protocol used for communicating with Kafka brokers in a Hopsworks cluster

Returns:
the security protocol for communicating with Kafka brokers in a Hopsworks cluster
hops.kafka.parse_avro_msg(msg, avro_schema)

Parses an avro record using a specified avro schema

Args:
msg:the avro message to parse
avro_schema:the avro schema
Returns:
The parsed/decoded message

hops.serving module

Utility functions to export models to the Models dataset and get information about models currently being served in the project.

class hops.serving.Serving(serving_json)

Bases: object

Represents a model being served in Hopsworks

__init__(serving_json)

Initialize the serving from JSON payload returned by Hopsworks REST API

Args:
feature_json:JSON data about the feature returned from Hopsworks REST API
exception hops.serving.ServingNotFound

Bases: exceptions.Exception

This exception will be raised if the requested serving could not be found

hops.serving.create_or_update(artifact_path, serving_name, serving_type='TENSORFLOW', model_version=1, batching_enabled=False, topic_name='CREATE', num_partitions=1, num_replicas=1, instances=1)

Creates a serving in Hopsworks if it does not exist, otherwise update the existing one.

Example use-case:

>>> from hops import serving
>>> serving.create_or_update("/Models/mnist", "mnist", "TENSORFLOW", 1)
Args:
artifact_path:path to the artifact to serve (tf model dir or sklearn script)
serving_name:name of the serving to create
serving_type:type of the serving, e.g “TENSORFLOW” or “SKLEARN”
model_version:version of the model to serve
batching_enabled:
 boolean flag whether to enable batching for the inference requests
instances:the number of serving instances (the more instances the more inference requests can

be served in parallel)

Returns:
None
hops.serving.delete(serving_name)

Deletes serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.delete("irisFlowerClassifier")
Args:
serving_name:name of the serving to delete
Returns:
None
hops.serving.exists(serving_name)

Checks if there exists a serving with the given name

Example use-case:

>>> from hops import serving
>>> serving.exist(serving_name)
Args:
serving_name:the name of the serving
Returns:
True if the serving exists, otherwise false
hops.serving.get_all()

Gets the list of servings for the current project

Example:

>>> from hops import serving
>>> servings = serving.get_all()
>>> servings[0].name
Returns:
list of servings
hops.serving.get_artifact_path(serving_name)

Gets the artifact path of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_artifact_path(serving_name)
Args:
serving_name:name of the serving to get the artifact path for
Returns:
the artifact path of the serving (model path in case of tensorflow, or python script in case of SkLearn)
hops.serving.get_id(serving_name)

Gets the id of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_id(serving_name)
Args:
serving_name:name of the serving to get the id for
Returns:
the id of the serving, None if Serving does not exist
hops.serving.get_kafka_topic(serving_name)

Gets the kafka topic name of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_kafka_topic(serving_name)
Args:
serving_name:name of the serving to get the kafka topic name for
Returns:
the kafka topic name of the serving
hops.serving.get_status(serving_name)

Gets the status of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_status(serving_name)
Args:
serving_name:name of the serving to get the status for
Returns:
the status of the serving
hops.serving.get_type(serving_name)

Gets the type of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_type(serving_name)
Args:
serving_name:name of the serving to get the typ for
Returns:
the type of the serving (e.g Tensorflow or SkLearn)
hops.serving.get_version(serving_name)

Gets the version of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_version(serving_name)
Args:
serving_name:name of the serving to get the version for
Returns:
the version of the serving
hops.serving.make_inference_request(serving_name, data, verb=':predict')

Submit an inference request

Example use-case:

>>> from hops import serving
>>> serving.make_inference_request("irisFlowerClassifier", [[1,2,3,4]], ":predict")
Args:
serving_name:name of the model being served
data:data/json to send to the serving
verb:type of request (:predict, :classify, or :regress)
Returns:
the JSON response
hops.serving.start(serving_name)

Starts a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.start("irisFlowerClassifier")
Args:
serving_name:name of the serving to start
Returns:
None
hops.serving.stop(serving_name)

Stops a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.stop("irisFlowerClassifier")
Args:
serving_name:name of the serving to stop
Returns:
None

hops.tensorboard module

Utility functions to manage the lifecycle of TensorBoard and get the path to write TensorBoard events.

hops.tensorboard.interactive_debugger()

Returns: address for interactive debugger in TensorBoard

hops.tensorboard.logdir()

Get the TensorBoard logdir. This function should be called in your wrapper function for Experiment, Parallel Experiment or Distributed Training and passed as the logdir for TensorBoard.

Case 1: local_logdir=True, then the logdir is on the local filesystem, otherwise it is in the folder for your experiment in your project in HDFS. Once the experiment is finished all the files that are present in the directory will be uploaded to tour experiment directory in the Experiments dataset.

Case 2: local_logdir=False, then the logdir is in HDFS in your experiment directory in the Experiments dataset.

Returns:
The path to store files for your experiment. The content is also visualized in TensorBoard.
hops.tensorboard.non_interactive_debugger()

Returns: address for non-interactive debugger in TensorBoard

hops.tls module

A module for getting TLS certificates in YARN containers, used for setting up Kafka inside Jobs/Notebooks on Hops.

hops.tls.get_ca_chain_location()

Get location of chain of CA certificates (PEM format) that are required to validate the private key certificate of the client used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to ca chain of certificate
hops.tls.get_client_certificate_location()

Get location of client certificate (PEM format) for the private key signed by trusted CA used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client certificate in PEM format
hops.tls.get_client_key_location()

Get location of client private key (PEM format) used for for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client private key in PEM format
hops.tls.get_key_store()
hops.tls.get_key_store_cert()

Get keystore certificate from local container

Returns:
Certificate password
hops.tls.get_key_store_pwd()

Get keystore password

Returns:
keystore password
hops.tls.get_trust_store()
hops.tls.get_trust_store_pwd()

Get truststore password

Returns:
truststore password

hops.util module

Miscellaneous utility functions for user applications.

hops.util.abspath(hdfs_path)

Returns the Flink configuration directory.

Returns:
The Flink config dir path.
hops.util.get_job_name()

If this method is called from inside a hopsworks job, it returns the name of the job.

Returns:
the name of the hopsworks job
hops.util.get_jwt()

Retrieves jwt from local container.

Returns:
Content of jwt.token file in local container.
hops.util.get_redshift_username_password(region_name, cluster_identifier, user, database)

Requests temporary Redshift credentials with a validity of 3600 seconds and the given parameters.

Args:
region_name:the AWS region name
cluster_identifier:
 the Redshift cluster identifier
user:the Redshift user to get credentials for
database:the Redshift database
Returns:
user, password
hops.util.get_requests_verify(hostname, port)

Get verification method for sending HTTP requests to Hopsworks. Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c Returns:

if env var HOPS_UTIL_VERIFY is not false
then if hopsworks certificate is self-signed, return the path to the truststore (PEM) else if hopsworks is not self-signed, return true

return false

hops.util.get_secret(secrets_store, secret_key=None, api_key_file=None)

Returns secret value from the AWS Secrets Manager or Parameter Store

Args:
secrets_store:the underlying secrets storage to be used, e.g. secretsmanager or parameterstore
secret_type (str):
 key for the secret value, e.g. api-key, cert-key, trust-store, key-store
api_token_file:path to a file containing an api key
Returns:
str:secret value
hops.util.num_executors()

Get the number of executors configured for Jupyter

Returns:
Number of configured executors for Jupyter
hops.util.num_param_servers()

Get the number of parameter servers configured for Jupyter

Returns:
Number of configured parameter servers for Jupyter
hops.util.parse_redhift_jdbc_url(url)

Parses a Redshift JDBC URL and extracts region_name, cluster_identifier, database and user.

Args:
url:the JDBC URL
Returns:
region_name, cluster_identifier, database, user
hops.util.send_request(method, resource, data=None, headers=None, stream=False)

Sends a request to Hopsworks. In case of Unauthorized response, submit the request once more as jwt might not have been read properly from local container. Args:

method: HTTP(S) method resource: Hopsworks resource data: HTTP(S) payload headers: HTTP(S) headers verify: Whether to verify the https request
Returns:
HTTP(S) response
hops.util.set_auth_header(headers)

Set authorization header for HTTP requests to Hopsworks, depending if setup is remote or not.

Args:
http headers
hops.util.write_b64_cert_to_bytes(b64_string, path)

Converts b64 encoded certificate to bytes file .

Args:
b64_string (str):
 b64 encoded string of certificate
path (str):path where file is saved, including file name. e.g. /path/key-store.jks

hops.beam module

Utility functions to manage the lifecycle of TensorFlow Extended (TFX) and Beam.

hops.beam.create_runner(runner_name, jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1)

Create a Beam runner. Creates the job with the job type that corresponds to the requested runner

Args:
runner_name: Name of the runner. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster.
Returns:
The runner spec.
hops.beam.exit_handler()
hops.beam.get_portable_runner_config(sdk_worker_parallelism=1, worker_threads=100, pre_optimize='all', execution_mode_for_batch='BATCH_FORCED')

Instantiate a list of pipeline configuration options for the PortableRunner.

Args:
sdk_worker_parallelism: sdk_worker_parallelism worker_threads: worker_threads pre_optimize: pre_optimize execution_mode_for_batch: execution_mode_for_batch
Returns:
a list of pipeline configuration options for the PortableRunner.
hops.beam.get_sdk_worker()

Get the path to the portability framework SDK worker script.

Returns:
the path to sdk_worker.sh
hops.beam.start(runner_name='runner', jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1, cleanup_runner=False, ignore_running=False)

Creates and starts a Beam runner and then starts the beam job server.

Args:
runner_name: Name of the runner. If not specified, the default runner name “runner” will be used. If the runner already exists, it will be updated with the provided arguments. If it doesn’t exist, it will be created. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster. cleanup_runner: Kill runner when Python terminates ignore_running: Ignore currently running instances of Runner
Returns:
The artifact_port, expansion_port, job_host, job_port, jobserver.pid
hops.beam.start_beam_jobserver(flink_session_name, artifacts_dir='Resources', jobserver_jar=None, sdk_worker_parallelism=1)

Start the Java Beam job server that connects to the flink session cluster. User needs to provide the job name that started the Flink session and optionally the worker parallelism.

Args:
flink_session_name:
 Job name that runs the Flink session.
sdk_worker_parallelism:
 Default parallelism for SDK worker processes. This option is only applied when the

pipeline option sdkWorkerParallelism is set to 0.Default is 1, If 0, worker parallelism will be dynamically decided by runner.See also: sdkWorkerParallelism Pipeline Option (default: 1). For further documentation, please refer to Apache Beam docs.

Returns:
artifact_port, expansion_port, job_host, job_port, jobserver.pid
hops.beam.start_runner(runner_name)

Start the runner. Submits an http request to the HOPSWORKS REST API to start the job

Returns:
The runner execution status.
hops.beam.stop_runner(runner_name)

Stop the runner.

Returns:
The runner execution status.

Module contents