hops package

Submodules

hops.beam module

Utility functions to manage the lifecycle of TensorFlow Extended (TFX) and Beam.

hops.beam.start_beam_jobserver(flink_session_name, artifacts_dir='Resources', jobserver_jar=None)

Start the Java Beam job server that connects to the flink session cluster. User needs to provide the job name that started the Flink session and optionally the worker parallelism.

Args:
flink_session_name

Job name that runs the Flink session.

artifacts_dir

Default dataset to store artifacts.

jobserver_jar

Portability framework jar filename.

Returns:

artifact_port, expansion_port, job_host, job_port, jobserver.pid

hops.beam.exit_handler()
hops.beam.get_sdk_worker()

Get the path to the portability framework SDK worker script.

Returns:

the path to sdk_worker.sh

hops.beam.create_runner(runner_name, jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1)

Create a Beam runner. Creates the job with the job type that corresponds to the requested runner

Args:

runner_name: Name of the runner. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster.

Returns:

The runner spec.

hops.beam.start_runner(runner_name)

Start the runner. Submits an http request to the HOPSWORKS REST API to start the job

Returns:

The runner execution status.

hops.beam.stop_runner(runner_name)

Stop the runner.

Returns:

The runner execution status.

hops.beam.start(runner_name='runner', jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1, cleanup_runner=False, ignore_running=False)

Creates and starts a Beam runner and then starts the beam job server.

Args:

runner_name: Name of the runner. If not specified, the default runner name “runner” will be used. If the runner already exists, it will be updated with the provided arguments. If it doesn’t exist, it will be created. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster. cleanup_runner: Kill runner when Python terminates ignore_running: Ignore currently running instances of Runner

Returns:

The artifact_port, expansion_port, job_host, job_port, jobserver.pid

hops.beam.get_portable_runner_config(sdk_worker_parallelism=1, worker_threads=100, pre_optimize='all', execution_mode_for_batch='BATCH_FORCED')

Instantiate a list of pipeline configuration options for the PortableRunner.

Args:

sdk_worker_parallelism: sdk_worker_parallelism worker_threads: worker_threads pre_optimize: pre_optimize execution_mode_for_batch: execution_mode_for_batch

Returns:

a list of pipeline configuration options for the PortableRunner.

hops.dataset module

A module for working with Hopsworks datasets.

class hops.dataset.HTTPUpload(file, path, flow_standard_chunk_size=1048576)

Bases: object

size_last_chunk = 0
DEFAULT_FLOW_STANDARD_CHUNK_SIZE = 1048576
f = None
params = {}
__init__(file, path, flow_standard_chunk_size=1048576)

Initialize self. See help(type(self)) for accurate signature.

resource = None
file = None
flow_standard_chunk_size = 1048576
upload()
hops.dataset.upload(file, remote_path, chunk_size=None)

Upload data to a project’s dataset by setting the path to the local file to be uploaded to a remote_path in a dataset. The file is split into chunks which are uploaded sequentially. If you run this method more than once for the same file and remote_path, if the file already exists in Hopsworks it will be overwritten.

Example usage:

>>> from hops import dataset
>>> dataset.upload("/tmp/mylocalfile.txt", "Resources/uploaded_files_dir")
Args:
file

the absolute path or the filename of the file to be uploaded.

remote_path

the dataset or the path to the folder in the dataset to upload the file.

Returns:

None

hops.devices module

Utility functions to retrieve information about available devices in the environment.

hops.devices.get_num_gpus()

Get the number of GPUs available in the environment and consequently by the application Assuming there is one GPU in the environment

>>> from hops import devices
>>> devices.get_num_gpus()
>>> 1
Returns:

Number of GPUs available in the environment

hops.elasticsearch module

A module for setting up Elastcisearch spark connector.

hops.elasticsearch.get_elasticsearch_index(index)

Get the valid elasticsearch index for later use. This helper method prefix the index name with the project name.

Args:
index

the elasticsearch index to interact with.

Returns:

A valid elasticsearch index name.

hops.elasticsearch.get_elasticsearch_config(index)

Get the required elasticsearch configuration to setup a connection using spark connector.

Args:
index

the elasticsearch index to interact with.

Returns:

A dictionary with required configuration.

hops.elasticsearch.get_authorization_token()

Get the authorization token to interact with elasticsearch.

Args:

Returns:

The authorization token to be used in http header.

hops.exceptions module

Common Exceptions thrown by the hops library

exception hops.exceptions.RestAPIError

Bases: Exception

This exception will be raised if there is an error response from a REST API call to Hopsworks

exception hops.exceptions.UnkownSecretStorageError

Bases: Exception

This exception will be raised if an unused secrets storage is passed as a parameter

exception hops.exceptions.APIKeyFileNotFound

Bases: Exception

This exception will be raised if the file with the API Key is not found in the local filesystem

hops.experiment module

Experiment module used for running Experiments, Parallel Experiments (hyperparameter optimization) and Distributed Training on Hopsworks.

The programming model is that you wrap the code to run inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment, see examples below. Whenever a function to run an experiment is invoked it is also registered in the Experiments service.

Three different types of experiments
  • Run a single standalone Experiment using the launch function.

  • Run Parallel Experiments performing hyperparameter optimization using grid_search or differential_evolution.

  • Run single or multi-machine Distributed Training using parameter_server, collective_all_reduce or mirrored.

class hops.experiment.Direction

Bases: object

MAX = 'MAX'
MIN = 'MIN'
hops.experiment.launch(map_fun, args_dict=None, name='no-name', local_logdir=False, description=None, metric_key=None)

Experiment or Parallel Experiment

Run an Experiment contained in map_fun one time with no arguments or multiple times with different arguments if args_dict is specified.

Example usage:

>>> from hops import experiment
>>> def train_nn():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>> experiment.launch(train_nn)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> def train_nn():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> experiment.launch(train_nn)
Args:
map_fun

The function to run

args_dict

If specified will run the same function multiple times with different arguments, {‘a’:[1,2], ‘b’:[5,3]} would run the function two times with arguments (1,5) and (2,3) provided that the function signature contains two arguments like def func(a,b):

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

A longer description for the experiment

metric_key

If returning a dict with multiple return values, this key should match the name of the key in the dict for the metric you want to associate with the experiment

Returns:

HDFS path in your project where the experiment is stored

Parallel Experiment

Run an Experiment contained in map_fun for configured number of random samples controlled by the samples parameter. Each hyperparameter is contained in boundary_dict with the key corresponding to the name of the hyperparameter and a list containing two elements defining the lower and upper bound. The experiment must return a metric corresponding to how ‘good’ the given hyperparameter combination is.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max')

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max', optimization_key='accuracy')
Args:
map_fun

The function to run

boundary_dict

dict containing hyperparameter name and corresponding boundaries, each experiment randomize a value in the boundary range.

direction

Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric

samples

the number of random samples to evaluate for each hyperparameter given the boundaries, for example samples=3 would result in 3 hyperparameter combinations in total to evaluate

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

A longer description for the experiment

optimization_key

When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value

Returns:

HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics

hops.experiment.differential_evolution(objective_function, boundary_dict, direction='MAX', generations=4, population=6, mutation=0.5, crossover=0.7, name='no-name', local_logdir=False, description=None, optimization_key='metric')

Parallel Experiment

Run differential evolution to explore a given search space for each hyperparameter and figure out the best hyperparameter combination. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    import tensorflow
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction=Direction.MAX)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.differential_evolution(train_nn, boundary_dict, direction=Direction.MAX, optimization_key='accuracy')
Args:
objective_function

the function to run, must return a metric

boundary_dict

a dict where each key corresponds to an argument of objective_function and the correspond value should be a list of two elements. The first element being the lower bound for the parameter and the the second element the upper bound.

direction

Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric

generations

number of generations

population

size of population

mutation

mutation rate to explore more different hyperparameters

crossover

how fast to adapt the population to the best in each generation

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

a longer description for the experiment

optimization_key

When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value

Returns:

HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics

Parallel Experiment

Run grid search evolution to explore a predefined set of hyperparameter combinations. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> grid_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.grid_search(train_nn, grid_dict, direction=Direction.MAX)

Returning multiple outputs, including images and logs:

>>> from hops import experiment
>>> grid_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from PIL import Image
>>>    f = open('logfile.txt', 'w')
>>>    f.write('Starting training...')
>>>    accuracy, loss = network.evaluate(learning_rate, layers, dropout)
>>>    img = Image.new(.....)
>>>    img.save('diagram.png')
>>>    return {'accuracy': accuracy, 'loss': loss, 'logfile': 'logfile.txt', 'diagram': 'diagram.png'}
>>> # Important! Remember: optimization_key must be set when returning multiple outputs
>>> experiment.grid_search(train_nn, grid_dict, direction=Direction.MAX, optimization_key='accuracy')
Args:
map_fun

the function to run, must return a metric

grid_dict

a dict with a key for each argument with a corresponding value being a list containing the hyperparameters to test, internally all possible combinations will be generated and run as separate Experiments

direction

Direction.MAX to maximize the returned metric, Direction.MIN to minize the returned metric

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

a longer description for the experiment

optimization_key

When returning a dict, the key name of the metric to maximize or minimize in the dict should be set as this value

Returns:

HDFS path in your project where the experiment is stored, dict with best hyperparameters and return dict with best metrics

hops.experiment.collective_all_reduce(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Sets up the cluster to run CollectiveAllReduceStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...CollectiveAllReduceStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.collective_all_reduce(distributed_training, local_logdir=True)
Args:
map_fun

the function containing code to run CollectiveAllReduceStrategy

name

the name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

a longer description for the experiment

evaluator

whether to run one of the workers as an evaluator

Returns:

HDFS path in your project where the experiment is stored and return value from the process running as chief

hops.experiment.parameter_server(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Sets up the cluster to run ParameterServerStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...ParameterServerStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.parameter_server(distributed_training, local_logdir=True)
Args:f
map_fun

contains the code where you are using ParameterServerStrategy.

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

a longer description for the experiment

evaluator

whether to run one of the workers as an evaluator

Returns:

HDFS path in your project where the experiment is stored and return value from the process running as chief

hops.experiment.mirrored(map_fun, name='no-name', local_logdir=False, description=None, evaluator=False)

Distributed Training

Example usage:

>>> from hops import experiment
>>> def mirrored_training():
>>>    # Do all imports in the function
>>>    import tensorflow
>>>    # Put all code inside the wrapper function
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...MirroredStrategy()...
>>> experiment.mirrored(mirrored_training, local_logdir=True)
Args:
map_fun

contains the code where you are using MirroredStrategy.

name

name of the experiment

local_logdir

True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS

description

a longer description for the experiment

evaluator

whether to run one of the workers as an evaluator

Returns:

HDFS path in your project where the experiment is stored and return value from the process running as chief

hops.featurestore module

A feature store client. This module exposes an API for interacting with feature stores in Hopsworks. It hides complexity and provides utility methods such as:

  • project_featurestore().

  • get_featuregroup().

  • get_feature().

  • get_features().

  • sql()

  • insert_into_featuregroup()

  • get_featurestore_metadata()

  • get_project_featurestores()

  • get_featuregroups()

  • get_training_datasets()

Below is some example usages of this API (assuming you have two featuregroups called ‘trx_graph_summary_features’ and ‘trx_summary_features’ with schemas:

|– cust_id: integer (nullable = true)

|– pagerank: float (nullable = true)

|– triangle_count: float (nullable = true)

and

|– cust_id: integer (nullable = true)

|– min_trx: float (nullable = true)

|– max_trx: float (nullable = true)

|– avg_trx: float (nullable = true)

|– count_trx: long (nullable = true)

, respectively.

>>> from hops import featurestore
>>> # Get feature group example
>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      featuregroup_version = 1)
>>>
>>> # Get single feature example
>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's
>>> # own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx",
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            featuregroup="trx_summary_features",
>>>                                            featuregroup_version = 1)
>>> # When you want to get features from different feature groups the API will infer how to join the features
>>> # together
>>>
>>> # Get list of features example
>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore(),
>>>                                      featuregroups_version_dict={"trx_graph_summary_features": 1,
>>>                                                                  "trx_summary_features": 1},
>>>                                                                  join_key="cust_id")
>>>
>>> # Run SQL query against feature store example
>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
>>>                  featurestore=featurestore.project_featurestore()).show(5)
>>>
>>> # Insert into featuregroup example
>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode
>>> # (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features",
>>>                                      featurestore=featurestore.project_featurestore(),
>>>                                      featuregroup_version=1, mode="append", descriptive_statistics=True,
>>>                                      feature_correlation=True, feature_histograms=True, cluster_analysis=True,
>>>                                      stat_columns=None)
>>>
>>> # Get featurestore metadata example
>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be
>>> # specified with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be
>>> # specified with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
>>>
>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
>>> # By default `get_featurestore_metadata` will use the project's feature store, but this can also be
>>> # specified with the optional argument featurestore
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # Compute featuergroup statistics (feature correlation, descriptive stats, feature distributions etc)
>>> # with Spark that will show up in the Featurestore Registry in Hopsworks
>>> # The API will default to the project's featurestore, featuregroup version 1, and
>>> # compute all statistics for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True,feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns
>>> # for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True, feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True,
>>>                                        stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
>>>
>>> # Create featuregroup from an existing dataframe
>>> # In most cases it is recommended that featuregroups are created in the UI on Hopsworks and that care is
>>> # taken in documenting the featuregroup.
>>> # However, sometimes it is practical to create a featuregroup directly from a spark dataframe and
>>> # fill in the metadata about the featuregroup later in the UI.
>>> # This can be done through the create_featuregroup API function
>>>
>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for
>>> # the new featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2",
>>>                                  description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None)
>>>
>>> # After you have found the features you need in the featurestore you can materialize the features into a
>>> # training dataset so that you can train a machine learning model using the features. Just as for featuregroups,
>>> # it is useful to version and document training datasets, for this reason HopsML supports managed training
>>> # datasets which enables you to easily version, document and automate the materialization of training datasets.
>>>
>>> # First we select the features (and/or labels) that we want
>>> dataset_df = featurestore.get_features(["pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx",
>>>                                         "min_trx","balance", "number_of_accounts"],
>>>                                        featurestore=featurestore.project_featurestore())
>>> # Now we can create a training dataset from the dataframe with some extended metadata such as schema
>>> # (automatically inferred).
>>> # By default when you create a training dataset it will be in "tfrecords" format and statistics will be
>>> # computed for all features.
>>> # After the dataset have been created you can view and/or update the metadata about the training dataset
>>> # from the Hopsworks featurestore UI
>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="",
>>>                                      featurestore=featurestore.project_featurestore(), data_format="csv",
>>>                                      training_dataset_version=1, jobs=[],
>>>                                      descriptive_statistics=False, feature_correlation=False,
>>>                                      feature_histograms=False, cluster_analysis=False, stat_columns=None)
>>>
>>> # Once a dataset have been created, its metadata is browsable in the featurestore registry
>>> # in the Hopsworks UI.
>>> # If you don't want to create a new training dataset but just overwrite or insert new data into an
>>> # existing training dataset,
>>> # you can use the API function 'insert_into_training_dataset'
>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset",
>>>                                           featurestore=featurestore.project_featurestore(),
>>>                                           training_dataset_version=1, descriptive_statistics=True,
>>>                                           feature_correlation=True, feature_histograms=True,
>>>                                           cluster_analysis=True, stat_columns=None)
>>>
>>> # After a managed dataset have been created, it is easy to share it and re-use it for training various models.
>>> # For example if the dataset have been materialized in tf-records format you can call the method
>>> # get_training_dataset_path(training_dataset)
>>> # to get the HDFS path and read it directly in your tensorflow code.
>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1,
>>> # but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(),
>>> training_dataset_version=1)
hops.featurestore.project_featurestore()

Gets the project’s featurestore name (project_featurestore)

Returns:

the project’s featurestore name

hops.featurestore.project_training_datasets_sink()

Gets the project’s training datasets sink

Returns:

the default training datasets folder in HopsFS for the project

hops.featurestore.get_featuregroup(featuregroup, featurestore=None, featuregroup_version=1, dataframe_type='spark', jdbc_args={}, online=False)

Gets a featuregroup from a featurestore as a spark dataframe

Example usage:

>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      featuregroup_version = 1, online=False)
Args:
featuregroup

the featuregroup to get

featurestore

the featurestore where the featuregroup resides, defaults to the project’s featurestore

featuregroup_version

the version of the featuregroup, defaults to 1

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

jdbc_args

a dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature groups

online

a boolean flag whether to fetch the online feature group or the offline one (assuming that the feature group has online serving enabled)

Returns:

a dataframe with the contents of the featuregroup

hops.featurestore.get_feature(feature, featurestore=None, featuregroup=None, featuregroup_version=1, dataframe_type='spark', jdbc_args={}, online=False)

Gets a particular feature (column) from a featurestore, if no featuregroup is specified it queries hopsworks metastore to see if the feature exists in any of the featuregroups in the featurestore. If the user knows which featuregroup contain the feature, it should be specified as it will improve performance of the query. Will first try to construct the query from the cached metadata, if that fails, it retries after updating the cache

Example usage:

>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's
>>> # own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx", featurestore=featurestore.project_featurestore(),
>>> featuregroup="trx_summary_features", featuregroup_version = 1, online=False)
Args:
feature

the feature name to get

featurestore

the featurestore where the featuregroup resides, defaults to the project’s featurestore

featuregroup

(Optional) the featuregroup where the feature resides

featuregroup_version

the version of the featuregroup, defaults to 1

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

jdbc_args

a dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature group in-case the feature belongs to a dynamic feature group

online

a boolean flag whether to fetch the online feature or the offline one (assuming that the feature group that the feature is stored in has online serving enabled) (for cached feature groups only)

Returns:

A dataframe with the feature

hops.featurestore.get_features(features, featurestore=None, featuregroups_version_dict={}, join_key=None, dataframe_type='spark', jdbc_args={}, online=False)

Gets a list of features (columns) from the featurestore. If no featuregroup is specified it will query hopsworks metastore to find where the features are stored. It will try to construct the query first from the cached metadata, if that fails it will re-try after reloading the cache

Example usage:

>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                     featurestore=featurestore.project_featurestore(),
>>>                                     featuregroups_version_dict={"trx_graph_summary_features": 1,
>>>                                     "trx_summary_features": 1}, join_key="cust_id", online=False)
Args:
features

a list of features to get from the featurestore

featurestore

the featurestore where the featuregroup resides, defaults to the project’s featurestore

featuregroups

(Optional) a dict with (fg –> version) for all the featuregroups where the features resides

featuregroup_version

the version of the featuregroup, defaults to 1

join_key

(Optional) column name to join on

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

jdbc_args

a dict of featuregroup_version -> dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature groups

online

a boolean flag whether to fetch the online version of the features (assuming that the feature groups where the features reside have online serving enabled) (for cached feature groups only)

Returns:

A dataframe with all the features

hops.featurestore.sql(query, featurestore=None, dataframe_type='spark', online=False)

Executes a generic SQL query on the featurestore

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
>>>                  featurestore=featurestore.project_featurestore()).show(5)
Args:
query

SQL query

featurestore

the featurestore to query, defaults to the project’s featurestore

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

online

boolean flag whether to run the query against the online featurestore (otherwise it will be the offline featurestore)

Returns:

A dataframe with the query results

hops.featurestore.insert_into_featuregroup(df, featuregroup, featurestore=None, featuregroup_version=1, mode='append', online=False, offline=True)

Saves the given dataframe to the specified featuregroup. Defaults to the project-featurestore This will append to the featuregroup. To overwrite a featuregroup, create a new version of the featuregroup from the UI and append to that table. Statistics will be updated depending on the settings made at creation time of the feature group. To update and recompute previously disabled statistics, use the update_featuregroup_stats method.

Example usage:

>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode
>>> # (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features",
>>>                                      featurestore=featurestore.project_featurestore(), featuregroup_version=1,
>>>                                      mode="append", descriptive_statistics=True, feature_correlation=True,
>>>                                      feature_histograms=True, cluster_analysis=True,
>>>                                      stat_columns=None, online=False, offline=True)
Args:
df

the dataframe containing the data to insert into the featuregroup

featuregroup

the name of the featuregroup (hive table name)

featurestore

the featurestore to save the featuregroup to (hive database)

featuregroup_version

the version of the featuregroup (defaults to 1)

mode

the write mode, only ‘overwrite’ and ‘append’ are supported

online

boolean flag whether to insert the data in the online version of the featuregroup (assuming the featuregroup already has online feature serving enabled)

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:

None

Raises:
CouldNotConvertDataframe

in case the provided dataframe could not be converted to a spark dataframe

hops.featurestore.update_featuregroup_stats(featuregroup, featuregroup_version=1, featurestore=None, descriptive_statistics=None, feature_correlation=None, feature_histograms=None, cluster_analysis=None, stat_columns=None, num_bins=None, num_clusters=None, corr_method=None)

Updates the statistics settings of a featuregroup and recomputes the specified statistics with spark and then saves them to Hopsworks by making a REST call. Leaving a setting set to None will keep the previous value.

Example usage:

>>> # The API will default to the project's featurestore, featuregroup version 1, and compute all statistics
>>> # for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                       featurestore=featurestore.project_featurestore(),
>>>                                       descriptive_statistics=True,feature_correlation=True,
>>>                                       feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for
>>> # example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True, feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True,
>>>                                        stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
featuregroup

the featuregroup to update the statistics for

featuregroup_version

the version of the featuregroup (defaults to 1)

featurestore

the featurestore where the featuregroup resides (defaults to the project’s featurestore)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

num_clusters

the number of clusters to use in clustering analysis (k-means)

corr_method

the method to compute feature correlation with (pearson or spearman)

Returns:

None

hops.featurestore.create_on_demand_featuregroup(sql_query, featuregroup, jdbc_connector_name, featurestore=None, description='', featuregroup_version=1)

Creates a new on-demand feature group in the feature store by registering SQL and an associated JDBC connector

Args:
sql_query

the SQL query to fetch the on-demand feature group

featuregroup

the name of the on-demand feature group

jdbc_connector_name

the name of the JDBC connector to apply the SQL query to get the on-demand feature group

featurestore

name of the feature store to register the feature group

description

description of the feature group

featuregroup_version

version of the feature group

Returns:

None

Raises:
ValueError

in case required inputs are missing

hops.featurestore.create_featuregroup(df, featuregroup, primary_key=[], description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], online=False, online_types=None, offline=True)

Creates a new cached featuregroup from a dataframe of features (sends the metadata to Hopsworks with a REST call to create the Hive table and store the metadata and then inserts the data of the spark dataframe into the newly created table).

The settings for the computation of summary statistics will be saved and applied when new data is inserted. To change the settings or recompute the statistics, use the update_featuregroup_stats method.

Example usage:

>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for the new
>>> # featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2",
>>>                                  description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[], online=False, offline=True)
Args:
df

the dataframe to create the featuregroup for (used to infer the schema)

featuregroup

the name of the new featuregroup

primary_key

a list of columns to be used as primary key of the new featuregroup, if not specified, the first column in the dataframe will be used as primary

description

a description of the featuregroup

featurestore

the featurestore of the featuregroup (defaults to the project’s featurestore)

featuregroup_version

the version of the featuregroup (defaults to 1)

jobs

list of Hopsworks jobs linked to the feature group

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

num_clusters

the number of clusters to use for cluster analysis

corr_method

the method to compute feature correlation with (pearson or spearman)

partition_by

a list of columns to partition_by, defaults to the empty list

online

boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data

online_types

a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:

None

Raises:
CouldNotConvertDataframe

in case the provided dataframe could not be converted to a spark dataframe

hops.featurestore.get_featurestore_metadata(featurestore=None, update_cache=False)

Sends a REST call to Hopsworks to get the list of featuregroups and their features for the given featurestore.

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
Args:
featurestore

the featurestore to query metadata of

update_cache

if true the cache is updated

Returns:

A list of featuregroups and their metadata

hops.featurestore.get_featuregroups(featurestore=None, online=False)

Gets a list of all featuregroups in a featurestore, uses the cached metadata.

>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be specified
>>> # with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore(), online=False)
Args:
featurestore

the featurestore to list featuregroups for, defaults to the project-featurestore

online

flag whether to filter the featuregroups that have online serving enabled

Returns:

A list of names of the featuregroups in this featurestore

hops.featurestore.get_features_list(featurestore=None, online=False)

Gets a list of all features in a featurestore, will use the cached featurestore metadata

>>> # List all Features in a Feature Store
>>> featurestore.get_features_list()
>>> # By default `get_features_list()` will use the project's feature store, but this can also be specified
>>> # with the optional argument `featurestore`
>>> featurestore.get_features_list(featurestore=featurestore.project_featurestore())
Args:
featurestore

the featurestore to list features for, defaults to the project-featurestore

online

flag whether to filter the features that have online serving enabled

Returns:

A list of names of the features in this featurestore

hops.featurestore.get_featuregroup_features_list(featuregroup, version=None, featurestore=None)

Gets a list of the names of the features in a featuregroup.

Args:
featuregroup

Name of the featuregroup to get feature names for.

version

Version of the featuregroup to use. Defaults to the latest version.

featurestore

The featurestore to list features for. Defaults to project-featurestore.

Returns:

A list of names of the features in this featuregroup.

hops.featurestore.get_training_datasets(featurestore=None)

Gets a list of all training datasets in a featurestore, will use the cached metadata

>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be specified
>>> # with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
Args:
featurestore

the featurestore to list training datasets for, defaults to the project-featurestore

Returns:

A list of names of the training datasets in this featurestore

hops.featurestore.get_project_featurestores()

Gets all featurestores for the current project

Example usage:

>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
Returns:

A list of all featurestores that the project have access to

hops.featurestore.get_dataframe_tf_record_schema(spark_df, fixed=True)

Infers the tf-record schema from a spark dataframe Note: this method is just for convenience, it should work in 99% of cases but it is not guaranteed, if spark or tensorflow introduces new datatypes this will break. The user can allways fallback to encoding the tf-example-schema manually.

Args:
spark_df

the spark dataframe to infer the tensorflow example record from

fixed

boolean flag indicating whether array columns should be treated with fixed size or variable size

Returns:

a dict with the tensorflow example

hops.featurestore.get_training_dataset_tf_record_schema(training_dataset, training_dataset_version=1, featurestore=None)

Gets the tf record schema for a training dataset that is stored in tfrecords format

Example usage:

>>> # get tf record schema for a tfrecords dataset
>>> featurestore.get_training_dataset_tf_record_schema("team_position_prediction", training_dataset_version=1,
>>>                                                    featurestore = featurestore.project_featurestore())
Args:
training_dataset

the training dataset to get the tfrecords schema for

training_dataset_version

the version of the training dataset

featurestore

the feature store where the training dataset resides

Returns:

the tf records schema

hops.featurestore.get_training_dataset(training_dataset, featurestore=None, training_dataset_version=1, dataframe_type='spark')

Reads a training dataset into a spark dataframe, will first look for the training dataset using the cached metadata of the featurestore, if it fails it will reload the metadata and try again.

Example usage: >>> featurestore.get_training_dataset(“team_position_prediction_csv”).show(5)

Args:
training_dataset

the name of the training dataset to read

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

Returns:

A dataframe with the given training dataset data

hops.featurestore.get_training_dataset_features_list(training_dataset, version=None, featurestore=None)

Gets a list of the names of the features in a training dataset.

Args:
training_dataset

Name of the training dataset to get feature names for.

version

Version of the training dataset to use. Defaults to the latest version.

featurestore

The featurestore to look for the dataset for. Defaults to project-featurestore.

Returns:

A list of names of the features in this training dataset.

hops.featurestore.create_training_dataset(df, training_dataset, description='', featurestore=None, data_format='tfrecords', training_dataset_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, petastorm_args={}, fixed=True, sink=None, path=None)

Creates a new training dataset from a dataframe, saves metadata about the training dataset to the database and saves the materialized dataset on hdfs

Example usage:

>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="",
>>>                                      featurestore=featurestore.project_featurestore(), data_format="csv",
>>>                                      training_dataset_version=1,
>>>                                      descriptive_statistics=False, feature_correlation=False,
>>>                                      feature_histograms=False, cluster_analysis=False, stat_columns=None,
>>>                                      sink = None, path=None)
Args:
df

the dataframe to create the training dataset from

training_dataset

the name of the training dataset

description

a description of the training dataset

featurestore

the featurestore that the training dataset is linked to

data_format

the format of the materialized training dataset

training_dataset_version

the version of the training dataset (defaults to 1)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

num_clusters

number of clusters to use for cluster analysis

corr_method

the method to compute feature correlation with (pearson or spearman)

petastorm_args

a dict containing petastorm parameters for serializing a dataset in the petastorm format. Required parameters are: ‘schema’

fixed

boolean flag indicating whether array columns should be treated with fixed size or variable size

sink

name of storage connector to store the training dataset

jobs

list of Hopsworks jobs linked to the training dataset

path

path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset.

Returns:

None

hops.featurestore.get_storage_connectors(featurestore=None)

Retrieves the names of all storage connectors in the feature store

Example usage:

>>> featurestore.get_storage_connectors()
>>> # By default the query will be for the project's feature store but you can also explicitly specify the
>>> # featurestore:
>>> featurestore.get_storage_connector(featurestore=featurestore.project_featurestore())
Args:
featurestore

the featurestore to query (default’s to project’s feature store)

Returns:

the storage connector with the given name

hops.featurestore.get_storage_connector(storage_connector_name, featurestore=None)

Looks up a storage connector by name

Example usage:

>>> featurestore.get_storage_connector("demo_featurestore_admin000_Training_Datasets")
>>> # By default the query will be for the project's feature store but you can also explicitly specify the
>>> # featurestore:
>>> featurestore.get_storage_connector("demo_featurestore_admin000_Training_Datasets",
>>>                                    featurestore=featurestore.project_featurestore())
Args:
storage_connector_name

the name of the storage connector

featurestore

the featurestore to query (default’s to project’s feature store)

Returns:

the storage connector with the given name

hops.featurestore.insert_into_training_dataset(df, training_dataset, featurestore=None, training_dataset_version=1, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, write_mode='overwrite')

Inserts the data in a training dataset from a spark dataframe (append or overwrite)

Example usage:

>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset",
>>>                                           featurestore=featurestore.project_featurestore(),
>>>                                           training_dataset_version=1,descriptive_statistics=True,
>>>                                           feature_correlation=True, feature_histograms=True,
>>>                                           cluster_analysis=True, stat_columns=None)
Args:
df

the dataframe to write

training_dataset

the name of the training dataset

featurestore

the featurestore that the training dataset is linked to

training_dataset_version

the version of the training dataset (defaults to 1)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

num_clusters

number of clusters to use for cluster analysis

corr_method

the method to compute feature correlation with (pearson or spearman)

write_mode

spark write mode (‘append’ or ‘overwrite’). Note: append is not supported for tfrecords datasets.

Returns:

None

hops.featurestore.get_training_dataset_path(training_dataset, featurestore=None, training_dataset_version=1)

Gets the HDFS path to a training dataset with a specific name and version in a featurestore

Example usage:

>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1,
>>> # but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(),
>>>                                        training_dataset_version=1)
Args:
training_dataset

name of the training dataset

featurestore

featurestore that the training dataset is linked to

training_dataset_version

version of the training dataset

Returns:

The HDFS path to the training dataset

hops.featurestore.get_latest_training_dataset_version(training_dataset, featurestore=None)

Utility method to get the latest version of a particular training dataset

Example usage:

>>> featurestore.get_latest_training_dataset_version("team_position_prediction")
Args:
training_dataset

the training dataset to get the latest version of

featurestore

the featurestore where the training dataset resides

Returns:

the latest version of the training dataset in the feature store

hops.featurestore.get_latest_featuregroup_version(featuregroup, featurestore=None)

Utility method to get the latest version of a particular featuregroup

Example usage:

>>> featurestore.get_latest_featuregroup_version("teams_features_spanish")
Args:
featuregroup

the featuregroup to get the latest version of

featurestore

the featurestore where the featuregroup resides

Returns:

the latest version of the featuregroup in the feature store

hops.featurestore.update_training_dataset_stats(training_dataset, training_dataset_version=1, featurestore=None, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, num_clusters=5, corr_method='pearson')

Updates the statistics of a featuregroup by computing the statistics with spark and then saving it to Hopsworks by making a REST call.

Example usage:

>>> # The API will default to the project's featurestore, training dataset version 1, and compute all statistics
>>> # for all columns
>>> featurestore.update_training_dataset_stats("teams_prediction")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1,
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            descriptive_statistics=True,feature_correlation=True,
>>>                                            feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns
>>> # for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1,
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            descriptive_statistics=True, feature_correlation=True,
>>>                                            feature_histograms=True, cluster_analysis=True,
>>>                                            stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
training_dataset

the training dataset to update the statistics for

training_dataset_version

the version of the training dataset (defaults to 1)

featurestore

the featurestore where the training dataset resides (defaults to the project’s featurestore)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

num_clusters

the number of clusters to use in clustering analysis (k-means)

corr_method

the method to compute feature correlation with (pearson or spearman)

Returns:

None

hops.featurestore.get_featuregroup_partitions(featuregroup, featurestore=None, featuregroup_version=1, dataframe_type='spark')

Gets the partitions of a featuregroup

Example usage:

>>> partitions = featurestore.get_featuregroup_partitions("trx_summary_features")
>>> #You can also explicitly define version, featurestore and type of the returned dataframe:
>>> featurestore.get_featuregroup_partitions("trx_summary_features",
>>>                                          featurestore=featurestore.project_featurestore(),
>>>                                          featuregroup_version = 1,
>>>                                          dataframe_type="spark")
Args:
featuregroup

the featuregroup to get partitions for

featurestore

the featurestore where the featuregroup resides, defaults to the project’s featurestore

featuregroup_version

the version of the featuregroup, defaults to 1

dataframe_type

the type of the returned dataframe (spark, pandas, python or numpy)

Returns:

a dataframe with the partitions of the featuregroup

hops.featurestore.visualize_featuregroup_distributions(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=None, color='lightblue', log=False, align='center', plot=True)

Visualizes the feature distributions (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_distributions("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_distributions("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  color="lightblue",
>>>                                                  figsize=None,
>>>                                                  log=False,
>>>                                                  align="center",
>>>                                                  plot=True)
Args:
featuregroup_name

the name of the featuregroup

featurestore

the featurestore where the featuregroup resides

featuregroup_version

the version of the featuregroup

figsize

size of the figure. If None, gets automatically set

color

the color of the histograms

log

whether to use log-scaling on the y-axis or not

align

how to align the bars, defaults to center.

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature distributions

hops.featurestore.visualize_featuregroup_correlations(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=16, 12, cmap='coolwarm', annot=True, fmt='.2f', linewidths=0.05, plot=True)

Visualizes the feature correlations (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_correlations("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_correlations("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  cmap="coolwarm",
>>>                                                  figsize=(16,12),
>>>                                                  annot=True,
>>>                                                  fmt=".2f",
>>>                                                  linewidths=.05
>>>                                                  plot=True)
Args:
featuregroup_name

the name of the featuregroup

featurestore

the featurestore where the featuregroup resides

featuregroup_version

the version of the featuregroup

figsize

the size of the figure

cmap

the color map

annot

whether to annotate the heatmap

fmt

how to format the annotations

linewidths

line width in the plot

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature correlations

hops.featurestore.visualize_featuregroup_clusters(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=16, 12, plot=True)

Visualizes the feature clusters (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_clusters("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_clusters("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  figsize=(16,12),
>>>                                                  plot=True)
Args:
featuregroup_name

the name of the featuregroup

featurestore

the featurestore where the featuregroup resides

featuregroup_version

the version of the featuregroup

figsize

the size of the figure

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature clusters

hops.featurestore.visualize_featuregroup_descriptive_stats(featuregroup_name, featurestore=None, featuregroup_version=1)

Visualizes the descriptive stats (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_descriptive_stats("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_descriptive_stats("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1)
Args:
featuregroup_name

the name of the featuregroup

featurestore

the featurestore where the featuregroup resides

featuregroup_version

the version of the featuregroup

Returns:

A pandas dataframe with the descriptive statistics

Raises:
FeatureVisualizationError

if there was an error in fetching the descriptive statistics

hops.featurestore.visualize_training_dataset_distributions(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=16, 12, color='lightblue', log=False, align='center', plot=True)

Visualizes the feature distributions (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_distributions("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_distributions("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  color="lightblue",
>>>                                                  figsize=(16,12),
>>>                                                  log=False,
>>>                                                  align="center",
>>>                                                  plot=True)
Args:
training_dataset_name

the name of the training dataset

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

figsize

size of the figure

figsize

the size of the figure

color

the color of the histograms

log

whether to use log-scaling on the y-axis or not

align

how to align the bars, defaults to center.

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature distributions

hops.featurestore.visualize_training_dataset_correlations(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=16, 12, cmap='coolwarm', annot=True, fmt='.2f', linewidths=0.05, plot=True)

Visualizes the feature distributions (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_correlations("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_correlations("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  cmap="coolwarm",
>>>                                                  figsize=(16,12),
>>>                                                  annot=True,
>>>                                                  fmt=".2f",
>>>                                                  linewidths=.05
>>>                                                  plot=True)
Args:
training_dataset_name

the name of the training dataset

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

figsize

the size of the figure

cmap

the color map

annot

whether to annotate the heatmap

fmt

how to format the annotations

linewidths

line width in the plot

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature correlations

hops.featurestore.visualize_training_dataset_clusters(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=16, 12, plot=True)

Visualizes the feature clusters (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_clusters("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_clusters("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  figsize=(16,12),
>>>                                                  plot=True)
Args:
training_dataset_name

the name of the training dataset

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

figsize

the size of the figure

plot

if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure

Returns:

if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure

Raises:
FeatureVisualizationError

if there was an error visualizing the feature clusters

hops.featurestore.visualize_training_dataset_descriptive_stats(training_dataset_name, featurestore=None, training_dataset_version=1)

Visualizes the descriptive stats (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_descriptive_stats("AML_dataset")
>>> # You can also explicitly define version and featurestore
>>> featurestore.visualize_training_dataset_descriptive_stats("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1)
Args:
training_dataset_name

the name of the training dataset

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

Returns:

A pandas dataframe with the descriptive statistics

Raises:
FeatureVisualizationError

if there was an error in fetching the descriptive statistics

hops.featurestore.get_featuregroup_statistics(featuregroup_name, featurestore=None, featuregroup_version=1)

Gets the computed statistics (if any) of a featuregroup

Example usage:

>>> stats = featurestore.get_featuregroup_statistics("trx_summary_features")
>>> # You can also explicitly define version and featurestore
>>> stats = featurestore.get_featuregroup_statistics("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1)
Args:
featuregroup_name

the name of the featuregroup

featurestore

the featurestore where the featuregroup resides

featuregroup_version

the version of the featuregroup

Returns:

A Statistics Object

hops.featurestore.get_training_dataset_statistics(training_dataset_name, featurestore=None, training_dataset_version=1)

Gets the computed statistics (if any) of a training dataset

Example usage:

>>> stats = featurestore.get_training_dataset_statistics("AML_dataset")
>>> # You can also explicitly define version and featurestore
>>> stats = featurestore.get_training_dataset_statistics("AML_dataset",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      training_dataset_version = 1)
Args:
training_dataset_name

the name of the training dataset

featurestore

the featurestore where the training dataset resides

training_dataset_version

the version of the training dataset

Returns:

A Statistics Object

hops.featurestore.connect(host, project_name, port=443, region_name='default', secrets_store='parameterstore', api_key_file=None, cert_folder='hops', hostname_verification=True, trust_store_path=None)

Connects to a feature store from a remote environment such as Amazon SageMaker

Example usage:

>>> featurestore.connect("hops.site", "my_feature_store")
Args:
host

the hostname of the Hopsworks cluster

project_name

the name of the project hosting the feature store to be used

port

the REST port of the Hopsworks cluster

region_name

The name of the AWS region in which the required secrets are stored

secrets_store

The secrets storage to be used. Either secretsmanager, parameterstore or local

api_key_file

path to a file containing an API key. For secrets_store=local only.

cert_folder

the folder on dbfs where to store the HopsFS certificates

hostname_verification

whether or not to verify Hopsworks’ certificate - default True

trust_store_path

path on dbfs containing the Hopsworks certificates

Returns:

None

hops.featurestore.setup_databricks(host, project_name, cert_folder='hops', port=443, region_name='default', secrets_store='parameterstore', api_key_file=None, hostname_verification=True, trust_store_path=None)

Set up the HopsFS and Hive connector on a Databricks cluster

Example usage:

>>> featurestore.setup_databricks("hops.site", "my_feature_store")
Args:
host

the hostname of the Hopsworks cluster

project_name

the name of the project hosting the feature store to be used

certs_folder

the folder on dbfs in which to store the Hopsworks certificates and the libraries to be installed when the cluster restart

port

the REST port of the Hopsworks cluster

region_name

The name of the AWS region in which the required secrets are stored

secrets_store

The secrets storage to be used. Either secretsmanager or parameterstore.

api_key_file

path to a file containing an API key. For secrets_store=local only.

hostname_verification

whether or not to verify Hopsworks’ certificate - default True

trust_store_path

path on dbfs containing the Hopsworks certificates

Returns:

None

hops.featurestore.get_credential(project_id, dbfs_folder)

get the credential and save them in the dbfs folder

Args:
project_id

the id of the project for which we want to get credentials

dbfs_folder

the folder in which to save the credentials

hops.featurestore.get_clients(project_id, dbfs_folder)

get the libraries and save them in the dbfs folder

Args:
project_id

the id of the project for which we want to get the clients

dbfs_folder

the folder in which to save the libraries

hops.featurestore.write_init_script(dbfs_folder)

write the init script

Args:
dbfs_folder

the folder in which to save the script

hops.featurestore.print_instructions(cert_folder, dbfs_folder, host)

print the instructions to set up the hopsfs hive connection on databricks

Args:
cert_folder

the path in dbfs of the folder in which the credention were saved

dbfs_folder

the folder in which the credential were saved

host

the host of the hive metastore

hops.featurestore.sync_hive_table_with_featurestore(featuregroup, description='', featurestore=None, featuregroup_version=1, jobs=[], feature_corr_data=None, featuregroup_desc_stats_data=None, features_histogram_data=None, cluster_analysis_data=None)

Synchronizes an existing Hive table with a Feature Store.

Example usage:

>>> # Save Hive Table
>>> sample_df.write.mode("overwrite").saveAsTable("hive_fs_sync_example_1")
>>> # Synchronize with Feature Store
>>> featurestore.sync_hive_table_with_featurestore("hive_fs_sync_example", featuregroup_version=1)
Args:
featuregroup

name of the featuregroup to synchronize with the hive table. The hive table should have a naming scheme of featuregroup_version

description

description of the feature group

featurestore

the feature store where the hive table is stored

featuregroup_version

version of the feature group

jobs

jobs to compute this feature group (optional)

feature_corr_data

correlation statistics (optional)

featuregroup_desc_stats_data

descriptive statistics (optional)

features_histogram_data

histogram statistics (optional)

cluster_analysis_data

cluster analysis (optional)

Returns:

None

hops.featurestore.import_featuregroup_redshift(storage_connector, query, featuregroup, primary_key=[], description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], online=False, online_types=None, offline=True)

Imports an external dataset of features into a feature group in Hopsworks. This function will read the dataset using spark and a configured JDBC storage connector for Redshift and then writes the data to Hopsworks Feature Store (Hive) and registers its metadata.

Example usage:

>>> featurestore.import_featuregroup_redshift(my_jdbc_connector_name, sql_query, featuregroup_name)
>>> # You can also be explicitly specify featuregroup metadata and what statistics to compute:
>>> featurestore.import_featuregroup_redshift(my_jdbc_connector_name, sql_query, featuregroup_name,
>>>                                  primary_key=["id"],
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(), featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[])
Args:
storage_connector

the storage connector used to connect to the external storage

query

the queury extracting data from Redshift

featuregroup

name of the featuregroup to import the dataset into the featurestore

primary_key

a list of columns to be used as primary key of the new featuregroup, if not specified, the first column in the dataframe will be used as primary key

description

metadata description of the feature group to import

featurestore

name of the featurestore database to import the feature group into

featuregroup_version

version of the feature group

jobs

list of Hopsworks jobs linked to the feature group (optional)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

corr_method

the method to compute feature correlation with (pearson or spearman)

num_clusters

the number of clusters to use for cluster analysis

partition_by

a list of columns to partition_by, defaults to the empty list

online

boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data

online_types

a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:

None

hops.featurestore.import_featuregroup_s3(storage_connector, path, featuregroup, primary_key=[], description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], data_format='parquet', online=False, online_types=None, offline=True)

Imports an external dataset of features into a feature group in Hopsworks. This function will read the dataset using spark and a configured storage connector (e.g to an S3 bucket) and then writes the data to Hopsworks Feature Store (Hive) and registers its metadata.

Example usage:

>>> featurestore.import_featuregroup_s3(my_s3_connector_name, s3_path, featuregroup_name,
>>>                                  data_format=s3_bucket_data_format)
>>> # You can also be explicitly specify featuregroup metadata and what statistics to compute:
>>> featurestore.import_featuregroup_s3(my_s3_connector_name, s3_path, featuregroup_name, primary_key=["id"],
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[], data_format="parquet", online=False,
>>>                                  online_types=None, offline=True)
Args:
storage_connector

the storage connector used to connect to the external storage

path

the path to read from the external storage

featuregroup

name of the featuregroup to import the dataset into the featurestore

primary_key

a list of columns to be used as primary key of the new featuregroup, if not specified, the first column in the dataframe will be used as primary key

description

metadata description of the feature group to import

featurestore

name of the featurestore database to import the feature group into

featuregroup_version

version of the feature group

jobs

list of Hopsworks jobs linked to the feature group (optional)

descriptive_statistics

a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup

feature_correlation

a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup

feature_histograms

a boolean flag whether to compute histograms for the numeric columns in the featuregroup

cluster_analysis

a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup

stat_columns

a list of columns to compute statistics for (defaults to all columns that are numeric)

num_bins

number of bins to use for computing histograms

corr_method

the method to compute feature correlation with (pearson or spearman)

num_clusters

the number of clusters to use for cluster analysis

partition_by

a list of columns to partition_by, defaults to the empty list

data_format

the format of the external dataset to read

online

boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data

online_types

a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:

None

hops.featurestore.get_online_featurestore_connector(featurestore=None)

Gets a JDBC connector for the online feature store

Args:
featurestore

the feature store name

Returns:

a DTO object of the JDBC connector for the online feature store

hops.featurestore.enable_featuregroup_online(featuregroup_name, featuregroup_version=1, featurestore=None, online_types=None)

Enables online feature serving for a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.enable_featurergroup_online(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.enable_featurergroup_online(featuregroup_name, featuregroup_version=1, featurestore=featurestore,
>>>                                          online_types=False)
Args:
featuregroup_name

name of the featuregroup

featuregroup_version

version of the featuregroup

featurestore

the featurestore that the featuregroup belongs to

online_types

a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

Returns:

None

hops.featurestore.disable_featuregroup_online(featuregroup_name, featuregroup_version=1, featurestore=None)

Enables online feature serving for a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.disable_featurergroup_online(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.disable_featurergroup_online(featuregroup_name, featuregroup_version=1, featurestore=featurestore)
Args:
featuregroup_name

name of the featuregroup

featuregroup_version

version of the featuregroup

featurestore

the featurestore that the featuregroup belongs to

Returns:

None

hops.featurestore.set_featuregroup_tag(name, tag, value=None, version=1, featurestore=None)

Attach tag to a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.set_featuregroup_tag(featuregroup_name, "SPORT")
>>> # You can also explicitly override the default arguments:
>>> featurestore.set_featuregroup_tag(featuregroup_name, "SPORT", value="Football", version=1, featurestore=featurestore)
Args:
name

name of the featuregroup

tag

name of the tag

value

value of the tag

version

version of the featuregroup

featurestore

the featurestore that the featuregroup belongs to

Returns:

None

hops.featurestore.get_featuregroup_tags(name, version=1, featurestore=None)

Get all tags attached to a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> tags = featurestore.get_featuregroup_tags(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> tags = featurestore.get_featuregroup_tags(featuregroup_name, version=1, featurestore=featurestore)
Args:
name

name of the featuregroup

version

version of the featuregroup

featurestore

the featurestore that the featuregroup belongs to

Returns:

The tags dictionary attached to the featuregroup

hops.featurestore.remove_featuregroup_tag(name, tag, version=1, featurestore=None)

Removes all tags attached to a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.remove_featuregroup_tags(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.remove_featuregroup_tags(featuregroup_name, version=1, featurestore=featurestore)
Args:
name

name of the featuregroup

tag

name of the tag to remove from the featuregroup

version

version of the featuregroup

featurestore

the featurestore that the featuregroup belongs to

Returns:

None

hops.featurestore.set_training_dataset_tag(name, tag, value=None, version=1, featurestore=None)

Attach tag to a training dataset

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.set_training_dataset_tag(training_dataset_name, "SPORT")
>>> # You can also explicitly override the default arguments:
>>> featurestore.set_training_dataset_tag(training_dataset_name, "SPORT", value="Football", version=1, featurestore=featurestore)
Args:
name

name of the training dataset

tag

name of the tag

value

value of the tag

version

version of the training dataset

featurestore

the featurestore that the training dataset belongs to

Returns:

None

hops.featurestore.get_training_dataset_tags(name, version=1, featurestore=None)

Get tags attached to a training dataset

Example usage:

>>> # The API will default to the project's feature store
>>> tags = featurestore.get_training_dataset_tags(training_dataset_name)
>>> # You can also explicitly override the default arguments:
>>> tags = featurestore.get_training_dataset_tags(training_dataset_name, version=1, featurestore=featurestore)
Args:
name

name of the training dataset

training_dataset_version

version of the training dataset

featurestore

the featurestore that the training dataset belongs to

Returns:

The tags dictionary attached to the training dataset

hops.featurestore.remove_training_dataset_tag(name, tag, version=1, featurestore=None)

Removes all tags attached to a training dataset

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.remove_training_dataset_tags(training_dataset_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.remove_training_dataset_tags(training_dataset_name, version=1, featurestore=featurestore)
Args:
name

name of the training dataset

tag

name of the tag to remove from the training dataset

version

version of the training dataset

featurestore

the featurestore that the training dataset belongs to

Returns:

None

hops.featurestore.get_tags()

Get tags that can be attached to a featuregroup or training dataset

Example usage:

>>> tags = featurestore.get_tags()
Returns:

List of tags

hops.hdfs module

API for interacting with the file system on Hops (HopsFS).

It is a wrapper around pydoop together with utility functions that are Hops-specific.

hops.hdfs.get_plain_path(abs_path)

Convert absolute HDFS/HOPSFS path to plain path (dropping prefix and ip)

Example use-case:

>>> hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
>>> # returns: "/Projects/demo_deep_learning_admin000/Models/"
Args:
abs_path

the absolute HDFS/hopsfs path containing prefix and/or ip

Returns:

the plain path without prefix and ip

hops.hdfs.project_id()

Get the Hopsworks project id from environment variables

Returns: the Hopsworks project id

hops.hdfs.project_user()

Gets the project username (“project__user”) from environment variables

Returns:

the project username

hops.hdfs.project_name()

Extracts the project name from the project username (“project__user”) or from the environment if available

Returns:

project name

hops.hdfs.project_path(project=None, exclude_nn_addr=False)

Get the path in HopsFS where the HopsWorks project is located. To point to a particular dataset, this path should be appended with the name of your dataset.

>>> from hops import hdfs
>>> project_path = hdfs.project_path()
>>> print("Project path: {}".format(project_path))
Args:
project_name

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

Returns:

returns the project absolute path

hops.hdfs.get()

Get a handle to pydoop hdfs using the default namenode (specified in hadoop config)

Returns:

Pydoop hdfs handle

hops.hdfs.get_fs()

Get a handle to pydoop fs using the default namenode (specified in hadoop config)

Returns:

Pydoop fs handle

hops.hdfs.copy_to_hdfs(local_path, relative_hdfs_path, overwrite=False, project=None)

Copies a path from local filesystem to HDFS project (recursively) using relative path in $CWD to a path in hdfs (hdfs_path)

For example, if you execute:

>>> copy_to_hdfs("data.tfrecords", "/Resources", project="demo")

This will copy the file data.tfrecords to hdfs://Projects/demo/Resources/data.tfrecords

Args:
local_path

Absolute or local path on the local filesystem to copy

relative_hdfs_path

a path in HDFS relative to the project root to where the local path should be written

overwrite

a boolean flag whether to overwrite if the path already exists in HDFS

project

name of the project, defaults to the current HDFS user’s project

hops.hdfs.delete(hdfs_path, recursive=False)

Deletes path, path can be absolute or relative. If recursive is set to True and path is a directory, then files will be deleted recursively.

For example

>>> delete("/Resources/", recursive=True)

will delete all files recursively in the folder “Resources” inside the current project.

Args:
hdfs_path

the path to delete (project-relative or absolute)

Returns:

None

Raises:

IOError when recursive is False and directory is non-empty

hops.hdfs.copy_to_local(hdfs_path, local_path='', overwrite=False, project=None)

Copies a directory or file from a HDFS project to a local private scratch directory. If there is not enough space on the local scratch directory, an exception is thrown. If the local file exists, and the hdfs file and the local file are the same size in bytes, return ‘ok’ immediately. If the local directory tree exists, and the hdfs subdirectory and the local subdirectory have the same files and directories, return ‘ok’ immediately.

For example, if you execute:

>>> copy_to_local("Resources/my_data")

This will copy the directory my_data from the Resources dataset in your project to the current working directory on the path ./my_data

Raises:

IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

local_path

the relative or full path to a directory on the local filesystem to copy to (relative to a scratch directory $PDIR), defaults to $CWD

overwrite

a boolean flag whether to overwrite if the path already exists in the local scratch directory.

project

name of the project, defaults to the current HDFS user’s project

Returns:

the full local pathname of the file/dir

hops.hdfs.cp(src_hdfs_path, dest_hdfs_path, overwrite=False)

Copy the contents of src_hdfs_path to dest_hdfs_path.

If src_hdfs_path is a directory, its contents will be copied recursively. Source file(s) are opened for reading and copies are opened for writing.

Args:
src_hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

dest_hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

overwrite

boolean flag whether to overwrite destination path or not.

hops.hdfs.glob(hdfs_path, recursive=False, project=None)

Finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order.

Globbing gives you the list of files in a dir that matches a supplied pattern

>>> glob('Resources/*.json')
>>> ['Resources/1.json', 'Resources/2.json']

glob is implemented as os.listdir() and fnmatch.fnmatch() We implement glob as hdfs.ls() and fnmatch.filter()

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS.

project

If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.

Raises:

IOError if the supplied hdfs path does not exist

Returns:

A possibly-empty list of path names that match pathname, which must be a string containing a path specification. pathname can be either absolute

hops.hdfs.lsl(hdfs_path, recursive=False, project=None)

Returns all the pathnames in the supplied directory.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).

recursive

if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.

project

If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.

Returns:

A possibly-empty list of path names stored in the supplied path.

hops.hdfs.rmr(hdfs_path, project=None)

Recursively remove files and directories.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).

project

If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.

hops.hdfs.mkdir(hdfs_path, project=None)

Create a directory and its parents as needed.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).

project

If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.

hops.hdfs.move(src, dest)

Move or rename src to dest.

Args:
src

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

dest

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

hops.hdfs.rename(src, dest)

Rename src to dest.

Args:
src

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

dest

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

hops.hdfs.chown(hdfs_path, user, group, project=None)

Change file owner and group.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to the given project path in HDFS).

user

New hdfs username

group

New hdfs group

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

hops.hdfs.chmod(hdfs_path, mode, project=None)

Change file mode bits.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

mode

File mode (user/group/world privilege) bits

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

hops.hdfs.access(hdfs_path, mode, project=None)

Perform the equivalent of os.access() on path.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

mode

File mode (user/group/world privilege) bits

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

Returns:

True if access is allowed, False if not.

hops.hdfs.open_file(hdfs_path, project=None, flags='rw', buff_size=0)

Opens an HDFS file for read/write/append and returns a file descriptor object (fd) that should be closed when no longer needed.

Args:

hdfs_path: you can specify either a full hdfs pathname or a relative one (relative to your project’s path in HDFS) flags: supported opening modes are ‘r’, ‘w’, ‘a’. In addition, a trailing ‘t’ can be added to specify text mode (e.g, ‘rt’ = open for reading text) buff_size: Pass 0 as buff_size if you want to use the “configured” values, i.e the ones set in the Hadoop configuration files.

Returns:

A file descriptor (fd) that needs to be closed (fd-close()) when it is no longer needed.

Raises:

IOError: If the file does not exist.

hops.hdfs.close()

Closes an the HDFS connection (disconnects to the namenode)

hops.hdfs.exists(hdfs_path, project=None)

Return True if hdfs_path exists in the default HDFS.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

Returns:

True if hdfs_path exists.

Raises: IOError

hops.hdfs.isfile(hdfs_path, project=None)

Return True if path refers to a file.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

Returns:

True if path refers to a file.

Raises: IOError

hops.hdfs.isdir(hdfs_path, project=None)

Return True if path refers to a directory.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

project

If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.

Returns:

True if path refers to a file.

Raises: IOError

hops.hdfs.capacity()

Returns the raw capacity of the filesystem

Returns:

filesystem capacity (int)

hops.hdfs.dump(data, hdfs_path)

Dumps data to a file

Args:
data

data to write to hdfs_path

hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

hops.hdfs.load(hdfs_path)

Read the content of hdfs_path and return it.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

Returns:

the read contents of hdfs_path

hops.hdfs.ls(hdfs_path, recursive=False, exclude_nn_addr=False)

lists a directory in HDFS

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

Returns:

returns a list of hdfs paths

hops.hdfs.stat(hdfs_path)

Performs the equivalent of os.stat() on hdfs_path, returning a StatResult object.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

Returns:

returns a list of hdfs paths

hops.hdfs.abs_path(hdfs_path)

Return an absolute path for hdfs_path.

Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

Returns:

Return an absolute path for hdfs_path.

hops.hdfs.add_module(hdfs_path, project=None)

Add a .py or .ipynb file from HDFS to sys.path

For example, if you execute:

>>> add_module("Resources/my_module.py")
>>> add_module("Resources/my_notebook.ipynb")

You can import it simply as:

>>> import my_module
>>> import my_notebook
Args:
hdfs_path

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS) to a .py or .ipynb file

Returns:

Return full local path to localized python file or converted python file in case of .ipynb file

hops.hdfs.is_tls_enabled()

Reads the ipc.server.ssl.enabled property from core-site.xml.

Returns:

returns True if ipc.server.ssl.enabled is true. False otherwise.

hops.hdfs.get_webhdfs_host()

Makes an SRV DNS query and gets the actual hostname of the NameNode

Returns:

returns NameNode’s hostname

hops.hdfs.get_webhdfs_port()

Makes an SRV DNS query and gets NameNode’s port for WebHDFS

Returns:

returns NameNode’s port for WebHDFS

hops.hive module

hops.hive.setup_hive_connection(database=None)

export enviroment variable with Hive connection configuration so it can be used by ipython-sql and PyHive to establish a connection with Hive

Args:
database

Name of the database to connect to

hops.jobs module

Utility functions to manage jobs in Hopsworks.

hops.jobs.create_job(name, job_config)

Create a job in Hopsworks

Args:

name: Name of the job to be created. job_config: A dictionary representing the job configuration

Returns:

HTTP(S)Connection

hops.jobs.start_job(name, args=None)

Start an execution of the job. Only one execution can be active for a job.

Returns:

The job status.

hops.jobs.stop_job(name)

Stop the current execution of the job. Returns:

The job status.

hops.jobs.get_executions(name, query='')

Get a list of the currently running executions for this job. Returns:

The job status.

hops.kafka module

A module for setting up Kafka Brokers and Consumers on the Hops platform. It hides the complexity of configuring Kafka by providing utility methods such as:

  • get_broker_endpoints().

  • get_security_protocol().

  • get_kafka_default_config().

  • etc.

Using these utility functions you can setup Kafka with the Kafka client-library of your choice, e.g SparkStreaming or confluent-kafka-python. For example, assuming that you have created a topic called “test” on Hopsworks and that you have installed confluent-kafka-python inside your project’s anaconda environment:

>>> from hops import kafka
>>> from confluent_kafka import Producer, Consumer
>>> TOPIC_NAME = "test"
>>> config = kafka.get_kafka_default_config()
>>> producer = Producer(config)
>>> consumer = Consumer(config)
>>> consumer.subscribe(["test"])
>>> # wait a little while before executing the rest of the code (put it in a different Jupyter cell)
>>> # so that the consumer get chance to subscribe (asynchronous call)
>>> for i in range(0, 10):
>>> producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
>>> # Trigger the sending of all messages to the brokers, 10sec timeout
>>> producer.flush(10)
>>> for i in range(0, 10):
>>> msg = consumer.poll(timeout=5.0)
>>> if msg is not None:
>>>     print('Consumed Message: {} from topic: {}'.format(msg.value(), msg.topic()))
>>> else:
>>>     print("Topic empty, timeout when trying to consume message")

Similarly, you can define a pyspark kafka consumer as follows, using the spark session defined in variable spark

>>> from hops import kafka
>>> from hops import tls
>>> TOPIC_NAME = "test"
>>> df = spark \.format("kafka")
>>> .option("kafka.bootstrap.servers", kafka.get_broker_endpoints())
>>> .option("kafka.ssl.truststore.location", tls.get_trust_store())
>>> .option("kafka.ssl.truststore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.keystore.location", tls.get_key_store())
>>> .option("kafka.ssl.keystore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.key.password", tls.get_trust_store_pwd())
>>> .option("subscribe", TOPIC_NAME)
>>> .load()
hops.kafka.get_broker_endpoints()

Get Kafka broker endpoints as a string with broker-endpoints “,” separated

Returns:

a string with broker endpoints comma-separated

hops.kafka.get_security_protocol()

Gets the security protocol used for communicating with Kafka brokers in a Hopsworks cluster

Returns:

the security protocol for communicating with Kafka brokers in a Hopsworks cluster

hops.kafka.get_broker_endpoints_list()

Get Kafka broker endpoints as a list

Returns:

a list with broker endpoint strings

hops.kafka.get_kafka_default_config()

Gets a default configuration for running secure Kafka on Hops

Returns:

dict with config_property –> value

hops.kafka.get_schema(topic)

Gets the Avro schema for a particular Kafka topic.

Args:
topic

Kafka topic name

Returns:

Avro schema as a string object in JSON format

hops.kafka.parse_avro_msg(msg, avro_schema)

Parses an avro record using a specified avro schema

Args:
msg

the avro message to parse

avro_schema

the avro schema

Returns:

The parsed/decoded message

hops.kafka.convert_json_schema_to_avro(json_schema)

Parses a JSON kafka topic schema returned by Hopsworks REST API into an avro schema

Args:
json_schema

the json schema to convert

Returns:

the avro schema

class hops.kafka.KafkaTopicDTO(kafka_topic_dto_json)

Bases: object

Represents a KafkaTopic in Hopsworks

__init__(kafka_topic_dto_json)

Initialize the kafka topic from JSON payload returned by Hopsworks REST API

Args:
kafka_topic_dto_json

JSON data about the kafka topic returned from Hopsworks REST API

hops.model module

Model module is used for exporting, versioning, attaching metadata to models. In pipelines it should be used to query the Model Repository for the best model version to use during inference.

hops.model.get_best_model(name, metric, direction)

Get the best model version by sorting on attached metadata such as model accuracy.

For example if you run this:

>>> from hops import model
>>> from hops.model import Metric
>>> model.get_best_model('mnist', 'accuracy', Metric.MAX)

It will return the mnist version where the ‘accuracy’ is the highest.

Args:
name

name of the model

metric

name of the metric to compare

direction

whether metric should be maximized or minimized to find the best model

Returns:

The best model

Raises:
ModelNotFound

if the model was not found

hops.model.get_model(name, version)

Get a specific model version given a model name and a version.

For example if you run this:

>>> from hops import model
>>> model.get_model('mnist', 1)

You will get version 1 of the model ‘mnist’

Args:
name

name of the model

version

version of the model

Returns:

The specified model version

Raises:
ModelNotFound

if the model was not found

hops.model.export(model_path, model_name, model_version=None, overwrite=False, metrics=None, description=None, synchronous=True, synchronous_timeout=120)

Copies a trained model to the Models directory in the project and creates the directory structure of:

>>> Models
>>>      |
>>>      - model_name
>>>                 |
>>>                 - version_x
>>>                 |
>>>                 - version_y

For example if you run this:

>>> from hops import model
>>> model.export("iris_knn.pkl", "irisFlowerClassifier", metrics={'accuracy': accuracy})

It will copy the local model file “iris_knn.pkl” to /Projects/projectname/Models/irisFlowerClassifier/1/iris.knn.pkl on HDFS, and overwrite in case there already exists a file with the same name in the directory.

If “model” is a directory on the local path exported by TensorFlow, and you run:

>>> model.export("/model", "mnist", metrics={'accuracy': accuracy, 'loss': loss})

It will copy the model directory contents to /Projects/projectname/Models/mnist/1/ , e.g the “model.pb” file and the “variables” directory.

Args:
model_path

path to the trained model (HDFS or local)

model_name

name of the model

model_version

version of the model

overwrite

boolean flag whether to overwrite in case a model already exists in the exported directory

metrics

dict of evaluation metrics to attach to model

description

description about the model

synchronous

whether to synchronously wait for the model to be indexed in the models rest endpoint

synchronous_timeout

max timeout in seconds for waiting for the model to be indexed

Returns:

The path to where the model was exported

Raises:
ValueError

if there was an error with th of the model due to invalid user input

ModelNotFound

if the model was not found

class hops.model.Metric

Bases: object

MAX = 'MAX'
MIN = 'MIN'
exception hops.model.ModelNotFound

Bases: Exception

This exception will be raised if the requested model could not be found

hops.numpy_helper module

API for reading/writing numpy arrays to/from HDFS

hops.numpy_helper.load(hdfs_filename, **kwds)

Reads a file from HDFS into a Numpy Array

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.load(…)

Returns:

A numpy array

Raises:

IOError: If the file does not exist

hops.numpy_helper.loadtxt(hdfs_filename, **kwds)

Load data from a text file in HDFS into a Numpy Array. Each row in the text file must have the same number of values.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.loadtxt(…)

Returns:

A numpy array

Raises:

IOError: If the file does not exist

hops.numpy_helper.genfromtxt(hdfs_filename, **kwds)

Load data from a HDFS text file, with missing values handled as specified. Each line past the first skip_header lines is split at the delimiter character, and characters following the comments character are discarded.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.loadtxt(…)

Returns:

A numpy array

Raises:

IOError: If the file does not exist

hops.numpy_helper.fromregex(hdfs_filename, **kwds)

Construct an array from a text file, using regular expression parsing.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.loadtxt(…)

Returns:

A numpy array

Raises:

IOError: If the file does not exist

hops.numpy_helper.fromfile(hdfs_filename, **kwds)

Construct an array from data in a text or binary file.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.loadtxt(…)

Returns:

A numpy array

Raises:

IOError: If the file does not exist

hops.numpy_helper.memmap(hdfs_filename, **kwds)

Create a memory-map to an array stored in a binary file on disk.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).

**kwds

You can add any additional args found in numpy.loadtxt(…)

Returns:

A memmap with dtype and shape that matches the data.

Raises:

IOError: If the file does not exist

hops.numpy_helper.save(hdfs_filename, data)

Saves a numpy array to a file in HDFS

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

data

numpy array

Raises:

IOError: If the local file does not exist

hops.numpy_helper.savez(hdfs_filename, *args, **kwds)

Save several arrays into a single file in uncompressed .npz format in HDFS If arguments are passed in with no keywords, the corresponding variable names, in the .npz file, are ‘arr_0’, ‘arr_1’, etc. If keyword arguments are given, the corresponding variable names, in the .npz file will match the keyword names.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

args

Arguments, optional Arrays to save to the file. Since it is not possible for Python to know the names of the arrays outside savez, the arrays will be saved with names ‘arr_0’, ‘arr_1’, and so on. These arguments can be any expression.

kwds

Keyword arguments, optional Arrays to save to the file. Arrays will be saved in the file with the keyword names. :data: numpy array

Returns: None

Raises:

IOError: If the local file does not exist

hops.numpy_helper.savez_compressed(hdfs_filename, *args, **kwds)

Save several arrays into a single file in uncompressed .npz format in HDFS If arguments are passed in with no keywords, the corresponding variable names, in the .npz file, are ‘arr_0’, ‘arr_1’, etc. If keyword arguments are given, the corresponding variable names, in the .npz file will match the keyword names.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

*args

Arguments, optional Arrays to save to the file. Since it is not possible for Python to know the names of the arrays outside savez, the arrays will be saved with names ‘arr_0’, ‘arr_1’, and so on. These arguments can be any expression.

**kwds

Keyword arguments, optional Arrays to save to the file. Arrays will be saved in the file with the keyword names. :data: numpy array

Returns: None

Raises:

IOError: If the local file does not exist

hops.pandas_helper module

API for opening csv files into Pandas from HDFS

hops.pandas_helper.read_csv(hdfs_filename, **kwds)

Reads a comma-separated values (csv) file from HDFS into a Pandas DataFrame

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

**kwds

You can add any additional args found in pandas.read_csv(…)

Returns:

A pandas dataframe

Raises:

IOError: If the file does not exist

hops.pandas_helper.read_parquet(hdfs_filename, **kwds)

Load a parquet object from a HDFS path, returning a DataFrame.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

**kwds

You can add any additional args found in pandas.read_csv(…)

Returns:

A pandas dataframe

Raises:

IOError: If the file does not exist

hops.pandas_helper.read_json(hdfs_filename, **kwds)

Convert a JSON string to pandas object.

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

**kwds

You can add any additional args found in pandas.read_csv(…)

Returns:

A pandas dataframe

Raises:

IOError: If the file does not exist

hops.pandas_helper.read_excel(hdfs_filename, **kwds)

Retrieve pandas object stored in HDFS file, optionally based on where criteria

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

**kwds

You can add any additional args found in pandas.read_csv(…)

Returns:

A pandas dataframe

Raises:

IOError: If the file does not exist

hops.pandas_helper.write_csv(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a comma-separated values (csv) text file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

dataframe

a Pandas dataframe

**kwds

You can add any additional args found in pandas.to_csv(…)

Raises:

IOError: If the file does not exist

hops.pandas_helper.write_parquet(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a parquet file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

dataframe

a Pandas dataframe

**kwds

You can add any additional args found in pandas.to_parequet(…)

Raises:

IOError: If the file does not exist

hops.pandas_helper.write_json(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a JSON file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename

You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)

dataframe

a Pandas dataframe

**kwds

You can add any additional args found in pandas.to_json(…)

Raises:

IOError: If the file does not exist

hops.project module

A module for connecting to and working with Hopsworks projects.

Using the utility functions you can connect to a project of a particular Hopsworks instance which sets up all the required environment variables and configuration parameters. Then you can use moduels such as dataset to interact with particular services of a project.

hops.project.connect(project, host=None, port=443, scheme='https', hostname_verification=False, api_key=None, region_name='default', secrets_store='local', trust_store_path=None)

Connect to a project of a Hopworks instance. Sets up API key and REST API endpoint.

Example usage:

>>> project.connect("dev_featurestore", "hops.site", api_key="api_key_file")
Args:
project_name

the name of the project to be used

host

the hostname of the Hopsworks cluster. If none specified, the library will attempt to the one set by

the environment variable constants.ENV_VARIABLES.REST_ENDPOINT_END_VAR :port: the REST port of the Hopsworks cluster :scheme: the scheme to use for connection to the REST API. :hostname_verification: whether or not to verify Hopsworks’ certificate - default True :api_key: path to a file containing an API key or the actual API key value. For secrets_store=local only. :region_name: The name of the AWS region in which the required secrets are stored :secrets_store: The secrets storage to be used. Secretsmanager or parameterstore for AWS, local otherwise. :trust_store_path: path to the file containing the Hopsworks certificates

Returns:

None

hops.project.get_project_info(project_name)

Makes a REST call to hopsworks to get all metadata of a project for the provided project.

Args:
project_name

the name of the project

Returns:

JSON response

Raises:
RestAPIError

if there was an error in the REST call to Hopsworks

hops.service_discovery module

class hops.service_discovery.ServiceDiscovery

Bases: object

static get_any_service(service_name)
static get_service(service_name)
static construct_service_fqdn(service_name)

hops.serving module

Utility functions to export models to the Models dataset and get information about models currently being served in the project.

hops.serving.exists(serving_name)

Checks if there exists a serving with the given name

Example use-case:

>>> from hops import serving
>>> serving.exist(serving_name)
Args:
serving_name

the name of the serving

Returns:

True if the serving exists, otherwise false

hops.serving.delete(serving_name)

Deletes serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.delete("irisFlowerClassifier")
Args:
serving_name

name of the serving to delete

Returns:

None

hops.serving.start(serving_name)

Starts a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.start("irisFlowerClassifier")
Args:
serving_name

name of the serving to start

Returns:

None

hops.serving.stop(serving_name)

Stops a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.stop("irisFlowerClassifier")
Args:
serving_name

name of the serving to stop

Returns:

None

hops.serving.create_or_update(artifact_path, serving_name, serving_type='TENSORFLOW', model_version=1, batching_enabled=False, topic_name='CREATE', num_partitions=1, num_replicas=1, instances=1)

Creates a serving in Hopsworks if it does not exist, otherwise update the existing one.

Example use-case:

>>> from hops import serving
>>> serving.create_or_update("/Models/mnist", "mnist", "TENSORFLOW", 1)
Args:
artifact_path

path to the artifact to serve (tf model dir or sklearn script)

serving_name

name of the serving to create

serving_type

type of the serving, e.g “TENSORFLOW” or “SKLEARN”

model_version

version of the model to serve

batching_enabled

boolean flag whether to enable batching for the inference requests

instances

the number of serving instances (the more instances the more inference requests can

be served in parallel)

Returns:

None

hops.serving.get_id(serving_name)

Gets the id of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_id(serving_name)
Args:
serving_name

name of the serving to get the id for

Returns:

the id of the serving, None if Serving does not exist

hops.serving.get_artifact_path(serving_name)

Gets the artifact path of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_artifact_path(serving_name)
Args:
serving_name

name of the serving to get the artifact path for

Returns:

the artifact path of the serving (model path in case of tensorflow, or python script in case of SkLearn)

hops.serving.get_type(serving_name)

Gets the type of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_type(serving_name)
Args:
serving_name

name of the serving to get the typ for

Returns:

the type of the serving (e.g Tensorflow or SkLearn)

hops.serving.get_version(serving_name)

Gets the version of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_version(serving_name)
Args:
serving_name

name of the serving to get the version for

Returns:

the version of the serving

hops.serving.get_kafka_topic(serving_name)

Gets the kafka topic name of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_kafka_topic(serving_name)
Args:
serving_name

name of the serving to get the kafka topic name for

Returns:

the kafka topic name of the serving

hops.serving.get_status(serving_name)

Gets the status of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_status(serving_name)
Args:
serving_name

name of the serving to get the status for

Returns:

the status of the serving

hops.serving.get_all()

Gets the list of servings for the current project

Example:

>>> from hops import serving
>>> servings = serving.get_all()
>>> servings[0].name
Returns:

list of servings

hops.serving.make_inference_request(serving_name, data, verb=':predict')

Submit an inference request

Example use-case:

>>> from hops import serving
>>> serving.make_inference_request("irisFlowerClassifier", [[1,2,3,4]], ":predict")
Args:
serving_name

name of the model being served

data

data/json to send to the serving

verb

type of request (:predict, :classify, or :regress)

Returns:

the JSON response

class hops.serving.Serving(serving_json)

Bases: object

Represents a model being served in Hopsworks

__init__(serving_json)

Initialize the serving from JSON payload returned by Hopsworks REST API

Args:
feature_json

JSON data about the feature returned from Hopsworks REST API

exception hops.serving.ServingNotFound

Bases: Exception

This exception will be raised if the requested serving could not be found

hops.tensorboard module

Utility functions to manage the lifecycle of TensorBoard and get the path to write TensorBoard events.

hops.tensorboard.logdir()

Get the TensorBoard logdir. This function should be called in your wrapper function for Experiment, Parallel Experiment or Distributed Training and passed as the logdir for TensorBoard.

Case 1: local_logdir=True, then the logdir is on the local filesystem, otherwise it is in the folder for your experiment in your project in HDFS. Once the experiment is finished all the files that are present in the directory will be uploaded to tour experiment directory in the Experiments dataset.

Case 2: local_logdir=False, then the logdir is in HDFS in your experiment directory in the Experiments dataset.

Returns:

The path to store files for your experiment. The content is also visualized in TensorBoard.

hops.tensorboard.interactive_debugger()

Returns: address for interactive debugger in TensorBoard

hops.tensorboard.non_interactive_debugger()

Returns: address for non-interactive debugger in TensorBoard

hops.tls module

A module for getting TLS certificates in YARN containers, used for setting up Kafka inside Jobs/Notebooks on Hops.

hops.tls.get_key_store()
hops.tls.get_trust_store()
hops.tls.get_key_store_cert()

Get keystore certificate from local container

Returns:

Certificate password

hops.tls.get_key_store_pwd()

Get keystore password

Returns:

keystore password

hops.tls.get_trust_store_pwd()

Get truststore password

Returns:

truststore password

hops.tls.get_client_certificate_location()

Get location of client certificate (PEM format) for the private key signed by trusted CA used for 2-way TLS authentication, for example with Kafka cluster

Returns:

string path to client certificate in PEM format

hops.tls.get_client_key_location()

Get location of client private key (PEM format) used for for 2-way TLS authentication, for example with Kafka cluster

Returns:

string path to client private key in PEM format

hops.tls.get_ca_chain_location()

Get location of chain of CA certificates (PEM format) that are required to validate the private key certificate of the client used for 2-way TLS authentication, for example with Kafka cluster

Returns:

string path to ca chain of certificate

hops.util module

Miscellaneous utility functions for user applications.

hops.util.http(resource_url, headers=None, method='GET', data=None)
hops.util.set_auth_header(headers)

Set authorization header for HTTP requests to Hopsworks, depending if setup is remote or not.

Args:

http headers

hops.util.get_requests_verify(hostname, port)

Get verification method for sending HTTP requests to Hopsworks. Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c Returns:

if env var HOPS_UTIL_VERIFY is not false

then if hopsworks certificate is self-signed, return the path to the truststore (PEM) else if hopsworks is not self-signed, return true

return false

hops.util.send_request(method, resource, data=None, headers=None, stream=False, files=None)

Sends a request to Hopsworks. In case of Unauthorized response, submit the request once more as jwt might not have been read properly from local container. Args:

method: HTTP(S) method resource: Hopsworks resource data: HTTP(S) payload headers: HTTP(S) headers stream: set the stream for the session object files: dictionary of {filename: fileobject} files to multipart upload.

Returns:

HTTP(S) response

hops.util.get_job_name()

If this method is called from inside a hopsworks job, it returns the name of the job.

Returns:

the name of the hopsworks job

hops.util.get_jwt()

Retrieves jwt from local container.

Returns:

Content of jwt.token file in local container.

hops.util.abspath(hdfs_path)
hops.util.parse_redhift_jdbc_url(url)

Parses a Redshift JDBC URL and extracts region_name, cluster_identifier, database and user.

Args:
url

the JDBC URL

Returns:

region_name, cluster_identifier, database, user

hops.util.get_redshift_username_password(region_name, cluster_identifier, user, database)

Requests temporary Redshift credentials with a validity of 3600 seconds and the given parameters.

Args:
region_name

the AWS region name

cluster_identifier

the Redshift cluster identifier

user

the Redshift user to get credentials for

database

the Redshift database

Returns:

user, password

Returns the Flink configuration directory.

Returns:

The Flink config dir path.

hops.util.num_executors()

Get the number of executors configured for Jupyter

Returns:

Number of configured executors for Jupyter

hops.util.num_param_servers()

Get the number of parameter servers configured for Jupyter

Returns:

Number of configured parameter servers for Jupyter

hops.util.get_secret(secrets_store, secret_key=None, api_key_file=None)

Returns secret value from the AWS Secrets Manager or Parameter Store

Args:
secrets_store

the underlying secrets storage to be used, e.g. secretsmanager or parameterstore

secret_type (str)

key for the secret value, e.g. api-key, cert-key, trust-store, key-store

api_token_file

path to a file containing an api key

Returns:
str

secret value

hops.util.write_b64_cert_to_bytes(b64_string, path)

Converts b64 encoded certificate to bytes file .

Args:
b64_string (str)

b64 encoded string of certificate

path (str)

path where file is saved, including file name. e.g. /path/key-store.jks

hops.xattr module

API for attaching, detaching, and reading extended metadata to HopsFS files/directories. It uses the Hopsworks /xattrs REST API

hops.xattr.set_xattr(hdfs_path, xattr_name, value)

Attach an extended attribute to an hdfs_path

Args:
hdfs_path

path of a file or directory

xattr_name

name of the extended attribute

value

value of the extended attribute

Returns:

None

hops.xattr.get_xattr(hdfs_path, xattr_name=None)

Get the extended attribute attached to an hdfs_path.

Args:
hdfs_path

path of a file or directory

xattr_name

name of the extended attribute

Returns:
A dictionary with the extended attribute(s) as key value pair(s). If the :xattr_name is None,

the API returns all associated extended attributes.

hops.xattr.remove_xattr(hdfs_path, xattr_name)

Remove an extended attribute attached to an hdfs_path

Args:
hdfs_path

path of a file or directory

xattr_name

name of the extended attribute

Returns:

None

Module contents