hops package

Submodules

hops.beam module

Utility functions to manage the lifecycle of TensorFlow Extended (TFX) and Beam.

hops.beam.create_runner(runner_name, jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1)

Create a Beam runner. Creates the job with the job type that corresponds to the requested runner

Args:
runner_name: Name of the runner. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster.
Returns:
The runner spec.
hops.beam.exit_handler()
hops.beam.get_portable_runner_config(sdk_worker_parallelism=1, worker_threads=100, pre_optimize='all', execution_mode_for_batch='BATCH_FORCED')

Instantiate a list of pipeline configuration options for the PortableRunner.

Args:
sdk_worker_parallelism: sdk_worker_parallelism worker_threads: worker_threads pre_optimize: pre_optimize execution_mode_for_batch: execution_mode_for_batch
Returns:
a list of pipeline configuration options for the PortableRunner.
hops.beam.get_sdk_worker()

Get the path to the portability framework SDK worker script.

Returns:
the path to sdk_worker.sh
hops.beam.start(runner_name='runner', jobmanager_heap_size=1024, num_of_taskmanagers=1, taskmanager_heap_size=4096, num_task_slots=1, kill_runner=False)

Creates and starts a Beam runner and then starts the beam job server.

Args:
runner_name: Name of the runner. If not specified, the default runner name “runner” will be used. If the runner already exists, it will be updated with the provided arguments. If it doesn’t exist, it will be created. jobmanager_heap_size: The memory(mb) of the Flink cluster JobManager num_of_taskmanagers: The number of TaskManagers of the Flink cluster. taskmanager_heap_size: The memory(mb) of the each TaskManager in the Flink cluster. num_task_slots: Number of slots of the Flink cluster. kill_runner: Kill runner when Python terminates
Returns:
The artifact_port, expansion_port, job_host, job_port, jobserver.pid
hops.beam.start_beam_jobserver(flink_session_name, artifacts_dir='Resources', jobserver_jar=None, sdk_worker_parallelism=1)

Start the Java Beam job server that connects to the flink session cluster. User needs to provide the job name that started the Flink session and optionally the worker parallelism.

Args:
flink_session_name:
 Job name that runs the Flink session.
sdk_worker_parallelism:
 Default parallelism for SDK worker processes. This option is only applied when the

pipeline option sdkWorkerParallelism is set to 0.Default is 1, If 0, worker parallelism will be dynamically decided by runner.See also: sdkWorkerParallelism Pipeline Option (default: 1). For further documentation, please refer to Apache Beam docs.

Returns:
artifact_port, expansion_port, job_host, job_port, jobserver.pid
hops.beam.start_runner(runner_name)

Start the runner. Submits an http request to the HOPSWORKS REST API to start the job

Returns:
The runner execution status.
hops.beam.stop_runner(runner_name)

Stop the runner.

Returns:
The runner execution status.

hops.devices module

Utility functions to retrieve information about available devices in the environment.

hops.devices.get_num_gpus()

Get the number of GPUs available in the environment and consequently by the application Assuming there is one GPU in the environment

>>> from hops import devices
>>> devices.get_num_gpus()
>>> 1
Returns:
Number of GPUs available in the environment

hops.exceptions module

Common Exceptions thrown by the hops library

exception hops.exceptions.RestAPIError

Bases: exceptions.Exception

This exception will be raised if there is an error response from a REST API call to Hopsworks

hops.experiment module

Experiment module used for running Experiments, Parallel Experiments and Distributed Training on Hopsworks.

The programming model is that you wrap the code to run inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment, see examples below. Whenever a function to run an experiment is invoked it is also registered in the Experiments service along with the provided information.

Three different types of experiments
  • Run a single standalone Experiment using the launch function.
  • Run Parallel Experiments performing hyperparameter optimization using grid_search or differential_evolution.
  • Run single or multi-machine Distributed Training using parameter_server or collective_all_reduce.
hops.experiment.begin(name='no-name', local_logdir=False, versioned_resources=None, description=None)

Start a custom Experiment, at the end of the experiment call end(metric).

IMPORTANT - This call should not be combined with other functions in the experiment module, other than end. Other experiment functions such as grid_search manages the begin and end functions internally

Example usage:

>>> from hops import experiment
>>> experiment.begin(name='calculate pi')
>>> # Code to calculate pi
>>> pi = calc_pi()
>>> experiment.end(pi)
Args:
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.collective_all_reduce(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training

Sets up the cluster to run CollectiveAllReduceStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...CollectiveAllReduceStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.collective_all_reduce(distributed_training, local_logdir=True)
Args:
map_fun:the function containing code to run CollectiveAllReduceStrategy
name:the name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.differential_evolution(objective_function, boundary_dict, direction='max', generations=10, population=10, mutation=0.5, crossover=0.7, cleanup_generations=False, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Parallel Experiment

Run differential evolution to explore a given search space for each hyperparameter and figure out the best hyperparameter combination. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate':[0.01, 0.2], 'dropout': [0.1, 0.9]}
>>> def train_nn(learning_rate, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max')
Args:
objective_function:
 the function to run, must return a metric
boundary_dict:a dict where each key corresponds to an argument of objective_function and the correspond value should be a list of two elements. The first element being the lower bound for the parameter and the the second element the upper bound.
direction:‘max’ to maximize the returned metric, ‘min’ to minize the returned metric
generations:number of generations
population:size of population
mutation:mutation rate to explore more different hyperparameters
crossover:how fast to adapt the population to the best in each generation
cleanup_generations:
 remove previous generations from HDFS, only keep the last 2
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored, dict with best hyperparameters
hops.experiment.end(metric=None)

End a custom Experiment previously registered with begin and register a metric to associate with it.

Args:
metric:The metric to associate with the Experiment

Parallel Experiment

Run multiple experiments and test a grid of hyperparameters for a neural network to maximize e.g. a Neural Network’s accuracy.

The following example will run train_nn with 6 different hyperparameter combinations

>>> from hops import experiment
>>> grid_dict = {'learning_rate':[0.1, 0.3], 'dropout': [0.4, 0.6, 0.1]}
>>> def train_nn(learning_rate, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, dropout)
>>> experiment.grid_search(train_nn, grid_dict, direction='max')

The following values will be injected in the function and run and evaluated.

  • (learning_rate=0.1, dropout=0.4)
  • (learning_rate=0.1, dropout=0.6)
  • (learning_rate=0.1, dropout=0.1)
  • (learning_rate=0.3, dropout=0.4)
  • (learning_rate=0.3, dropout=0.6)
  • (learning_rate=0.3, dropout=0.1)
Args:
map_fun:the function to run, must return a metric
args_dict:a dict with a key for each argument with a corresponding value being a list containing the hyperparameters to test, internally all possible combinations will be generated and run as separate Experiments
direction:‘max’ to maximize the returned metric, ‘min’ to minize the returned metric
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.launch(map_fun, args_dict=None, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Experiment or Parallel Experiment

Run an Experiment contained in map_fun one time with no arguments or multiple times with different arguments if args_dict is specified.

Example usage:

>>> from hops import experiment
>>> def train_nn():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    logdir = tensorboard.logdir()
>>>    # code for preprocessing, training and exporting model
>>>    # optionally return a value for the experiment which is registered in Experiments service
>>> experiment.launch(train_nn)
Args:
map_fun:The function to run
args_dict:If specified will run the same function multiple times with different arguments, {‘a’:[1,2], ‘b’:[5,3]} would run the function two times with arguments (1,5) and (2,3) provided that the function signature contains two arguments like def func(a,b):
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.mirrored(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training

Example usage:

>>> from hops import experiment
>>> def mirrored_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...MirroredStrategy()...
>>> experiment.mirrored(mirrored_training, local_logdir=True)
Args:
map_fun:contains the code where you are using MirroredStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.parameter_server(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training

Sets up the cluster to run ParameterServerStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...ParameterServerStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.parameter_server(distributed_training, local_logdir=True)
Args:
map_fun:contains the code where you are using ParameterServerStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored

Parallel Experiment

Run an Experiment contained in map_fun for configured number of random samples controlled by the samples parameter. Each hyperparameter is contained in boundary_dict with the key corresponding to the name of the hyperparameter and a list containing two elements defining the lower and upper bound. The experiment must return a metric corresponding to how ‘good’ the given hyperparameter combination is.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.random_search(train_nn, boundary_dict, samples=14, direction='max')
Args:
map_fun:The function to run
boundary_dict:dict containing hyperparameter name and corresponding boundaries, each experiment randomize a value in the boundary range.
direction:If set to ‘max’ the highest value returned will correspond to the best solution, if set to ‘min’ the opposite is true
samples:the number of random samples to evaluate for each hyperparameter given the boundaries
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored

hops.featurestore module

A feature store client. This module exposes an API for interacting with feature stores in Hopsworks. It hides complexity and provides utility methods such as:

  • project_featurestore().
  • get_featuregroup().
  • get_feature().
  • get_features().
  • sql()
  • insert_into_featuregroup()
  • get_featurestore_metadata()
  • get_project_featurestores()
  • get_featuregroups()
  • get_training_datasets()

Below is some example usages of this API (assuming you have two featuregroups called ‘trx_graph_summary_features’ and ‘trx_summary_features’ with schemas:

|– cust_id: integer (nullable = true)

|– pagerank: float (nullable = true)

|– triangle_count: float (nullable = true)

and

|– cust_id: integer (nullable = true)

|– min_trx: float (nullable = true)

|– max_trx: float (nullable = true)

|– avg_trx: float (nullable = true)

|– count_trx: long (nullable = true)

, respectively.

>>> from hops import featurestore
>>> # Get feature group example
>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      featuregroup_version = 1)
>>>
>>> # Get single feature example
>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's
>>> # own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx",
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            featuregroup="trx_summary_features",
>>>                                            featuregroup_version = 1)
>>> # When you want to get features from different feature groups the API will infer how to join the features
>>> # together
>>>
>>> # Get list of features example
>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore(),
>>>                                      featuregroups_version_dict={"trx_graph_summary_features": 1,
>>>                                                                  "trx_summary_features": 1},
>>>                                                                  join_key="cust_id")
>>>
>>> # Run SQL query against feature store example
>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
>>>                  featurestore=featurestore.project_featurestore()).show(5)
>>>
>>> # Insert into featuregroup example
>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode
>>> # (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features",
>>>                                      featurestore=featurestore.project_featurestore(),
>>>                                      featuregroup_version=1, mode="append", descriptive_statistics=True,
>>>                                      feature_correlation=True, feature_histograms=True, cluster_analysis=True,
>>>                                      stat_columns=None)
>>>
>>> # Get featurestore metadata example
>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be
>>> # specified with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be
>>> # specified with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
>>>
>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
>>> # By default `get_featurestore_metadata` will use the project's feature store, but this can also be
>>> # specified with the optional argument featurestore
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # Compute featuergroup statistics (feature correlation, descriptive stats, feature distributions etc)
>>> # with Spark that will show up in the Featurestore Registry in Hopsworks
>>> # The API will default to the project's featurestore, featuregroup version 1, and
>>> # compute all statistics for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True,feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns
>>> # for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True, feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True,
>>>                                        stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
>>>
>>> # Create featuregroup from an existing dataframe
>>> # In most cases it is recommended that featuregroups are created in the UI on Hopsworks and that care is
>>> # taken in documenting the featuregroup.
>>> # However, sometimes it is practical to create a featuregroup directly from a spark dataframe and
>>> # fill in the metadata about the featuregroup later in the UI.
>>> # This can be done through the create_featuregroup API function
>>>
>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for
>>> # the new featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2",
>>>                                  description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None)
>>>
>>> # After you have found the features you need in the featurestore you can materialize the features into a
>>> # training dataset so that you can train a machine learning model using the features. Just as for featuregroups,
>>> # it is useful to version and document training datasets, for this reason HopsML supports managed training
>>> # datasets which enables you to easily version, document and automate the materialization of training datasets.
>>>
>>> # First we select the features (and/or labels) that we want
>>> dataset_df = featurestore.get_features(["pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx",
>>>                                         "min_trx","balance", "number_of_accounts"],
>>>                                        featurestore=featurestore.project_featurestore())
>>> # Now we can create a training dataset from the dataframe with some extended metadata such as schema
>>> # (automatically inferred).
>>> # By default when you create a training dataset it will be in "tfrecords" format and statistics will be
>>> # computed for all features.
>>> # After the dataset have been created you can view and/or update the metadata about the training dataset
>>> # from the Hopsworks featurestore UI
>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="",
>>>                                      featurestore=featurestore.project_featurestore(), data_format="csv",
>>>                                      training_dataset_version=1, jobs=[],
>>>                                      descriptive_statistics=False, feature_correlation=False,
>>>                                      feature_histograms=False, cluster_analysis=False, stat_columns=None)
>>>
>>> # Once a dataset have been created, its metadata is browsable in the featurestore registry
>>> # in the Hopsworks UI.
>>> # If you don't want to create a new training dataset but just overwrite or insert new data into an
>>> # existing training dataset,
>>> # you can use the API function 'insert_into_training_dataset'
>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset",
>>>                                           featurestore=featurestore.project_featurestore(),
>>>                                           training_dataset_version=1, descriptive_statistics=True,
>>>                                           feature_correlation=True, feature_histograms=True,
>>>                                           cluster_analysis=True, stat_columns=None)
>>>
>>> # After a managed dataset have been created, it is easy to share it and re-use it for training various models.
>>> # For example if the dataset have been materialized in tf-records format you can call the method
>>> # get_training_dataset_path(training_dataset)
>>> # to get the HDFS path and read it directly in your tensorflow code.
>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1,
>>> # but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(),
>>> training_dataset_version=1)
hops.featurestore.connect(host, project_name, port=443, region_name='default')

Connects to a feature store from a remote environment such as Amazon SageMaker

Example usage:

>>> featurestore.connect("hops.site", "my_feature_store")
Args:
host:the hostname of the Hopsworks cluster
project_name:the name of the project hosting the feature store to be used
port:the REST port of the Hopsworks cluster
region_name:The name of the AWS region in which the required secrets are stored
Returns:
None
hops.featurestore.create_featuregroup(df, featuregroup, primary_key=None, description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], online=False, online_types=None, offline=True)

Creates a new cached featuregroup from a dataframe of features (sends the metadata to Hopsworks with a REST call to create the Hive table and store the metadata and then inserts the data of the spark dataframe into the newly created table)

Example usage:

>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for the new
>>> # featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2",
>>>                                  description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[], online=False, offline=True)
Args:
df:the dataframe to create the featuregroup for (used to infer the schema)
featuregroup:the name of the new featuregroup
primary_key:the primary key of the new featuregroup, if not specified, the first column in the dataframe will be used as primary
description:a description of the featuregroup
featurestore:the featurestore of the featuregroup (defaults to the project’s featurestore)
featuregroup_version:
 the version of the featuregroup (defaults to 1)
jobs:list of Hopsworks jobs linked to the feature group
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
partition_by:a list of columns to partition_by, defaults to the empty list
online:boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data
online_types:a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:
None
Raises:
CouldNotConvertDataframe:
 in case the provided dataframe could not be converted to a spark dataframe
hops.featurestore.create_on_demand_featuregroup(sql_query, featuregroup, jdbc_connector_name, featurestore=None, description='', featuregroup_version=1)

Creates a new on-demand feature group in the feature store by registering SQL and an associated JDBC connector

Args:
sql_query:the SQL query to fetch the on-demand feature group
featuregroup:the name of the on-demand feature group
jdbc_connector_name:
 the name of the JDBC connector to apply the SQL query to get the on-demand feature group
featurestore:name of the feature store to register the feature group
description:description of the feature group
featuregroup_version:
 version of the feature group
Returns:
None
Raises:
ValueError:in case required inputs are missing
hops.featurestore.create_training_dataset(df, training_dataset, description='', featurestore=None, data_format='tfrecords', training_dataset_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, petastorm_args={}, fixed=True, sink=None, path=None)

Creates a new training dataset from a dataframe, saves metadata about the training dataset to the database and saves the materialized dataset on hdfs

Example usage:

>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="",
>>>                                      featurestore=featurestore.project_featurestore(), data_format="csv",
>>>                                      training_dataset_version=1,
>>>                                      descriptive_statistics=False, feature_correlation=False,
>>>                                      feature_histograms=False, cluster_analysis=False, stat_columns=None,
>>>                                      sink = None, path=None)
Args:
df:the dataframe to create the training dataset from
training_dataset:
 the name of the training dataset
description:a description of the training dataset
featurestore:the featurestore that the training dataset is linked to
data_format:the format of the materialized training dataset
training_dataset_version:
 the version of the training dataset (defaults to 1)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
petastorm_args:a dict containing petastorm parameters for serializing a dataset in the petastorm format. Required parameters are: ‘schema’
fixed:boolean flag indicating whether array columns should be treated with fixed size or variable size
sink:name of storage connector to store the training dataset
jobs:list of Hopsworks jobs linked to the training dataset
path:path to complement the sink storage connector with, e.g if the storage connector points to an S3 bucket, this path can be used to define a sub-directory inside the bucket to place the training dataset.
Returns:
None
hops.featurestore.disable_featuregroup_online(featuregroup_name, featuregroup_version=1, featurestore=None)

Enables online feature serving for a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.disable_featurergroup_online(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.disable_featurergroup_online(featuregroup_name, featuregroup_version=1, featurestore=featurestore)
Args:
featuregroup_name:
 name of the featuregroup
featuregroup_version:
 version of the featuregroup
featurestore:the featurestore that the featuregroup belongs to
Returns:
None
hops.featurestore.enable_featuregroup_online(featuregroup_name, featuregroup_version=1, featurestore=None, online_types=None)

Enables online feature serving for a feature group

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.enable_featurergroup_online(featuregroup_name)
>>> # You can also explicitly override the default arguments:
>>> featurestore.enable_featurergroup_online(featuregroup_name, featuregroup_version=1, featurestore=featurestore,
>>>                                          online_types=False)
Args:
featuregroup_name:
 name of the featuregroup
featuregroup_version:
 version of the featuregroup
featurestore:the featurestore that the featuregroup belongs to
online_types:a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.
Returns:
None
hops.featurestore.get_dataframe_tf_record_schema(spark_df, fixed=True)

Infers the tf-record schema from a spark dataframe Note: this method is just for convenience, it should work in 99% of cases but it is not guaranteed, if spark or tensorflow introduces new datatypes this will break. The user can allways fallback to encoding the tf-example-schema manually.

Args:
spark_df:the spark dataframe to infer the tensorflow example record from
fixed:boolean flag indicating whether array columns should be treated with fixed size or variable size
Returns:
a dict with the tensorflow example
hops.featurestore.get_feature(feature, featurestore=None, featuregroup=None, featuregroup_version=1, dataframe_type='spark', jdbc_args={}, online=False)

Gets a particular feature (column) from a featurestore, if no featuregroup is specified it queries hopsworks metastore to see if the feature exists in any of the featuregroups in the featurestore. If the user knows which featuregroup contain the feature, it should be specified as it will improve performance of the query. Will first try to construct the query from the cached metadata, if that fails, it retries after updating the cache

Example usage:

>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's
>>> # own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx", featurestore=featurestore.project_featurestore(),
>>> featuregroup="trx_summary_features", featuregroup_version = 1, online=False)
Args:
feature:the feature name to get
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroup:(Optional) the featuregroup where the feature resides
featuregroup_version:
 the version of the featuregroup, defaults to 1
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
jdbc_args:a dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature group in-case the feature belongs to a dynamic feature group
online:a boolean flag whether to fetch the online feature or the offline one (assuming that the feature group that the feature is stored in has online serving enabled) (for cached feature groups only)
Returns:
A dataframe with the feature
hops.featurestore.get_featuregroup(featuregroup, featurestore=None, featuregroup_version=1, dataframe_type='spark', jdbc_args={}, online=False)

Gets a featuregroup from a featurestore as a spark dataframe

Example usage:

>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      featuregroup_version = 1, online=False)
Args:
featuregroup:the featuregroup to get
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroup_version:
 the version of the featuregroup, defaults to 1
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
jdbc_args:a dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature groups
online:a boolean flag whether to fetch the online feature group or the offline one (assuming that the feature group has online serving enabled)
Returns:
a dataframe with the contents of the featuregroup
hops.featurestore.get_featuregroup_features_list(featuregroup, version=None, featurestore=None)

Gets a list of the names of the features in a featuregroup.

Args:
featuregroup:Name of the featuregroup to get feature names for.
version:Version of the featuregroup to use. Defaults to the latest version.
featurestore:The featurestore to list features for. Defaults to project-featurestore.
Returns:
A list of names of the features in this featuregroup.
hops.featurestore.get_featuregroup_partitions(featuregroup, featurestore=None, featuregroup_version=1, dataframe_type='spark')

Gets the partitions of a featuregroup

Example usage:

>>> partitions = featurestore.get_featuregroup_partitions("trx_summary_features")
>>> #You can also explicitly define version, featurestore and type of the returned dataframe:
>>> featurestore.get_featuregroup_partitions("trx_summary_features",
>>>                                          featurestore=featurestore.project_featurestore(),
>>>                                          featuregroup_version = 1,
>>>                                          dataframe_type="spark")
Args:
featuregroup:the featuregroup to get partitions for
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroup_version:
 the version of the featuregroup, defaults to 1
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
a dataframe with the partitions of the featuregroup
hops.featurestore.get_featuregroup_statistics(featuregroup_name, featurestore=None, featuregroup_version=1)

Gets the computed statistics (if any) of a featuregroup

Example usage:

>>> stats = featurestore.get_featuregroup_statistics("trx_summary_features")
>>> # You can also explicitly define version and featurestore
>>> stats = featurestore.get_featuregroup_statistics("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1)
Args:
featuregroup_name:
 the name of the featuregroup
featurestore:the featurestore where the featuregroup resides
featuregroup_version:
 the version of the featuregroup
Returns:
A Statistics Object
hops.featurestore.get_featuregroups(featurestore=None, online=False)

Gets a list of all featuregroups in a featurestore, uses the cached metadata.

>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be specified
>>> # with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore(), online=False)
Args:
featurestore:the featurestore to list featuregroups for, defaults to the project-featurestore
online:flag whether to filter the featuregroups that have online serving enabled
Returns:
A list of names of the featuregroups in this featurestore
hops.featurestore.get_features(features, featurestore=None, featuregroups_version_dict={}, join_key=None, dataframe_type='spark', jdbc_args={}, online=False)

Gets a list of features (columns) from the featurestore. If no featuregroup is specified it will query hopsworks metastore to find where the features are stored. It will try to construct the query first from the cached metadata, if that fails it will re-try after reloading the cache

Example usage:

>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                      featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"],
>>>                                     featurestore=featurestore.project_featurestore(),
>>>                                     featuregroups_version_dict={"trx_graph_summary_features": 1,
>>>                                     "trx_summary_features": 1}, join_key="cust_id", online=False)
Args:
features:a list of features to get from the featurestore
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroups:(Optional) a dict with (fg –> version) for all the featuregroups where the features resides
featuregroup_version:
 the version of the featuregroup, defaults to 1
join_key:(Optional) column name to join on
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
jdbc_args:a dict of featuregroup_version -> dict of argument_name -> value with jdbc connection string arguments to be filled in dynamically at runtime for fetching on-demand feature groups
online:a boolean flag whether to fetch the online version of the features (assuming that the feature groups where the features reside have online serving enabled) (for cached feature groups only)
Returns:
A dataframe with all the features
hops.featurestore.get_features_list(featurestore=None, online=False)

Gets a list of all features in a featurestore, will use the cached featurestore metadata

>>> # List all Features in a Feature Store
>>> featurestore.get_features_list()
>>> # By default `get_features_list()` will use the project's feature store, but this can also be specified
>>> # with the optional argument `featurestore`
>>> featurestore.get_features_list(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to list features for, defaults to the project-featurestore
online:flag whether to filter the features that have online serving enabled
Returns:
A list of names of the features in this featurestore
hops.featurestore.get_featurestore_metadata(featurestore=None, update_cache=False)

Sends a REST call to Hopsworks to get the list of featuregroups and their features for the given featurestore.

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to query metadata of
update_cache:if true the cache is updated
Returns:
A list of featuregroups and their metadata
hops.featurestore.get_latest_featuregroup_version(featuregroup, featurestore=None)

Utility method to get the latest version of a particular featuregroup

Example usage:

>>> featurestore.get_latest_featuregroup_version("teams_features_spanish")
Args:
featuregroup:the featuregroup to get the latest version of
featurestore:the featurestore where the featuregroup resides
Returns:
the latest version of the featuregroup in the feature store
hops.featurestore.get_latest_training_dataset_version(training_dataset, featurestore=None)

Utility method to get the latest version of a particular training dataset

Example usage:

>>> featurestore.get_latest_training_dataset_version("team_position_prediction")
Args:
training_dataset:
 the training dataset to get the latest version of
featurestore:the featurestore where the training dataset resides
Returns:
the latest version of the training dataset in the feature store
hops.featurestore.get_online_featurestore_connector(featurestore=None)

Gets a JDBC connector for the online feature store

Args:
featurestore:the feature store name
Returns:
a DTO object of the JDBC connector for the online feature store
hops.featurestore.get_project_featurestores()

Gets all featurestores for the current project

Example usage:

>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
Returns:
A list of all featurestores that the project have access to
hops.featurestore.get_storage_connector(storage_connector_name, featurestore=None)

Looks up a storage connector by name

Example usage:

>>> featurestore.get_storage_connector("demo_featurestore_admin000_Training_Datasets")
>>> # By default the query will be for the project's feature store but you can also explicitly specify the
>>> # featurestore:
>>> featurestore.get_storage_connector("demo_featurestore_admin000_Training_Datasets",
>>>                                    featurestore=featurestore.project_featurestore())
Args:
storage_connector_name:
 the name of the storage connector
featurestore:the featurestore to query (default’s to project’s feature store)
Returns:
the storage connector with the given name
hops.featurestore.get_storage_connectors(featurestore=None)

Retrieves the names of all storage connectors in the feature store

Example usage:

>>> featurestore.get_storage_connectors()
>>> # By default the query will be for the project's feature store but you can also explicitly specify the
>>> # featurestore:
>>> featurestore.get_storage_connector(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to query (default’s to project’s feature store)
Returns:
the storage connector with the given name
hops.featurestore.get_training_dataset(training_dataset, featurestore=None, training_dataset_version=1, dataframe_type='spark')

Reads a training dataset into a spark dataframe, will first look for the training dataset using the cached metadata of the featurestore, if it fails it will reload the metadata and try again.

Example usage: >>> featurestore.get_training_dataset(“team_position_prediction_csv”).show(5)

Args:
training_dataset:
 the name of the training dataset to read
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
A dataframe with the given training dataset data
hops.featurestore.get_training_dataset_features_list(training_dataset, version=None, featurestore=None)

Gets a list of the names of the features in a training dataset.

Args:
training_dataset:
 Name of the training dataset to get feature names for.
version:Version of the training dataset to use. Defaults to the latest version.
featurestore:The featurestore to look for the dataset for. Defaults to project-featurestore.
Returns:
A list of names of the features in this training dataset.
hops.featurestore.get_training_dataset_path(training_dataset, featurestore=None, training_dataset_version=1)

Gets the HDFS path to a training dataset with a specific name and version in a featurestore

Example usage:

>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1,
>>> # but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(),
>>>                                        training_dataset_version=1)
Args:
training_dataset:
 name of the training dataset
featurestore:featurestore that the training dataset is linked to
training_dataset_version:
 version of the training dataset
Returns:
The HDFS path to the training dataset
hops.featurestore.get_training_dataset_statistics(training_dataset_name, featurestore=None, training_dataset_version=1)

Gets the computed statistics (if any) of a training dataset

Example usage:

>>> stats = featurestore.get_training_dataset_statistics("AML_dataset")
>>> # You can also explicitly define version and featurestore
>>> stats = featurestore.get_training_dataset_statistics("AML_dataset",
>>>                                                      featurestore=featurestore.project_featurestore(),
>>>                                                      training_dataset_version = 1)
Args:
training_dataset_name:
 the name of the training dataset
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
Returns:
A Statistics Object
hops.featurestore.get_training_dataset_tf_record_schema(training_dataset, training_dataset_version=1, featurestore=None)

Gets the tf record schema for a training dataset that is stored in tfrecords format

Example usage:

>>> # get tf record schema for a tfrecords dataset
>>> featurestore.get_training_dataset_tf_record_schema("team_position_prediction", training_dataset_version=1,
>>>                                                    featurestore = featurestore.project_featurestore())
Args:
training_dataset:
 the training dataset to get the tfrecords schema for
training_dataset_version:
 the version of the training dataset
featurestore:the feature store where the training dataset resides
Returns:
the tf records schema
hops.featurestore.get_training_datasets(featurestore=None)

Gets a list of all training datasets in a featurestore, will use the cached metadata

>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be specified
>>> # with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to list training datasets for, defaults to the project-featurestore
Returns:
A list of names of the training datasets in this featurestore
hops.featurestore.import_featuregroup_redshift(storage_connector, query, featuregroup, primary_key=None, description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], online=False, online_types=None, offline=True)

Imports an external dataset of features into a feature group in Hopsworks. This function will read the dataset using spark and a configured JDBC storage connector for Redshift and then writes the data to Hopsworks Feature Store (Hive) and registers its metadata.

Example usage:

>>> featurestore.import_featuregroup_redshift(my_jdbc_connector_name, sql_query, featuregroup_name)
>>> # You can also be explicitly specify featuregroup metadata and what statistics to compute:
>>> featurestore.import_featuregroup_redshift(my_jdbc_connector_name, sql_query, featuregroup_name, primary_key="id",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(), featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[])
Args:
storage_connector:
 the storage connector used to connect to the external storage
query:the queury extracting data from Redshift
featuregroup:name of the featuregroup to import the dataset into the featurestore
primary_key:primary key of the featuregroup
description:metadata description of the feature group to import
featurestore:name of the featurestore database to import the feature group into
featuregroup_version:
 version of the feature group
jobs:list of Hopsworks jobs linked to the feature group (optional)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
corr_method:the method to compute feature correlation with (pearson or spearman)
num_clusters:the number of clusters to use for cluster analysis
partition_by:a list of columns to partition_by, defaults to the empty list
online:boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data
online_types:a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:
None
hops.featurestore.import_featuregroup_s3(storage_connector, path, featuregroup, primary_key=None, description='', featurestore=None, featuregroup_version=1, jobs=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, partition_by=[], data_format='parquet', online=False, online_types=None, offline=True)

Imports an external dataset of features into a feature group in Hopsworks. This function will read the dataset using spark and a configured storage connector (e.g to an S3 bucket) and then writes the data to Hopsworks Feature Store (Hive) and registers its metadata.

Example usage:

>>> featurestore.import_featuregroup_s3(my_s3_connector_name, s3_path, featuregroup_name,
>>>                                  data_format=s3_bucket_data_format)
>>> # You can also be explicitly specify featuregroup metadata and what statistics to compute:
>>> featurestore.import_featuregroup_s3(my_s3_connector_name, s3_path, featuregroup_name, primary_key="id",
>>>                                  description="trx_summary_features without the column count_trx",
>>>                                  featurestore=featurestore.project_featurestore(),featuregroup_version=1,
>>>                                  jobs=[], descriptive_statistics=False,
>>>                                  feature_correlation=False, feature_histograms=False, cluster_analysis=False,
>>>                                  stat_columns=None, partition_by=[], data_format="parquet", online=False, 
>>>                                  online_types=None, offline=True)
Args:
storage_connector:
 the storage connector used to connect to the external storage
path:the path to read from the external storage
featuregroup:name of the featuregroup to import the dataset into the featurestore
primary_key:primary key of the featuregroup
description:metadata description of the feature group to import
featurestore:name of the featurestore database to import the feature group into
featuregroup_version:
 version of the feature group
jobs:list of Hopsworks jobs linked to the feature group (optional)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
corr_method:the method to compute feature correlation with (pearson or spearman)
num_clusters:the number of clusters to use for cluster analysis
partition_by:a list of columns to partition_by, defaults to the empty list
data_format:the format of the external dataset to read
online:boolean flag, if this is set to true, a MySQL table for online feature data will be created in addition to the Hive table for offline feature data
online_types:a dict with feature_name –> online_type, if a feature is present in this dict, the online_type will be taken from the dict rather than inferred from the spark dataframe.

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:
None
hops.featurestore.insert_into_featuregroup(df, featuregroup, featurestore=None, featuregroup_version=1, mode='append', descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, online=False, offline=True)

Saves the given dataframe to the specified featuregroup. Defaults to the project-featurestore This will append to the featuregroup. To overwrite a featuregroup, create a new version of the featuregroup from the UI and append to that table.

Example usage:

>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode
>>> # (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features",
>>>                                      featurestore=featurestore.project_featurestore(), featuregroup_version=1,
>>>                                      mode="append", descriptive_statistics=True, feature_correlation=True,
>>>                                      feature_histograms=True, cluster_analysis=True,
>>>                                      stat_columns=None, online=False, offline=True)
Args:
df:the dataframe containing the data to insert into the featuregroup
featuregroup:the name of the featuregroup (hive table name)
featurestore:the featurestore to save the featuregroup to (hive database)
featuregroup_version:
 the version of the featuregroup (defaults to 1)
mode:the write mode, only ‘overwrite’ and ‘append’ are supported
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
online:boolean flag whether to insert the data in the online version of the featuregroup (assuming the featuregroup already has online feature serving enabled)

:offline boolean flag whether to insert the data in the offline version of the featuregroup

Returns:
None
Raises:
CouldNotConvertDataframe:
 in case the provided dataframe could not be converted to a spark dataframe
hops.featurestore.insert_into_training_dataset(df, training_dataset, featurestore=None, training_dataset_version=1, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, write_mode='overwrite')

Inserts the data in a training dataset from a spark dataframe (append or overwrite)

Example usage:

>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset",
>>>                                           featurestore=featurestore.project_featurestore(),
>>>                                           training_dataset_version=1,descriptive_statistics=True,
>>>                                           feature_correlation=True, feature_histograms=True,
>>>                                           cluster_analysis=True, stat_columns=None)
Args:
df:the dataframe to write
training_dataset:
 the name of the training dataset
featurestore:the featurestore that the training dataset is linked to
training_dataset_version:
 the version of the training dataset (defaults to 1)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
write_mode:spark write mode (‘append’ or ‘overwrite’). Note: append is not supported for tfrecords datasets.
Returns:
None
hops.featurestore.project_featurestore()

Gets the project’s featurestore name (project_featurestore)

Returns:
the project’s featurestore name
hops.featurestore.project_training_datasets_sink()

Gets the project’s training datasets sink

Returns:
the default training datasets folder in HopsFS for the project
hops.featurestore.sql(query, featurestore=None, dataframe_type='spark', online=False)

Executes a generic SQL query on the featurestore

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5",
>>>                  featurestore=featurestore.project_featurestore()).show(5)
Args:
query:SQL query
featurestore:the featurestore to query, defaults to the project’s featurestore
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
online:boolean flag whether to run the query against the online featurestore (otherwise it will be the offline featurestore)
Returns:
A dataframe with the query results
hops.featurestore.sync_hive_table_with_featurestore(featuregroup, description='', featurestore=None, featuregroup_version=1, jobs=[], feature_corr_data=None, featuregroup_desc_stats_data=None, features_histogram_data=None, cluster_analysis_data=None)

Synchronizes an existing Hive table with a Feature Store.

Example usage:

>>> # Save Hive Table
>>> sample_df.write.mode("overwrite").saveAsTable("hive_fs_sync_example_1")
>>> # Synchronize with Feature Store
>>> featurestore.sync_hive_table_with_featurestore("hive_fs_sync_example", featuregroup_version=1)
Args:
featuregroup:name of the featuregroup to synchronize with the hive table. The hive table should have a naming scheme of featuregroup_version
description:description of the feature group
featurestore:the feature store where the hive table is stored
featuregroup_version:
 version of the feature group
jobs:jobs to compute this feature group (optional)
feature_corr_data:
 correlation statistics (optional)
featuregroup_desc_stats_data:
 descriptive statistics (optional)
features_histogram_data:
 histogram statistics (optional)
cluster_analysis_data:
 cluster analysis (optional)
Returns:
None
hops.featurestore.update_featuregroup_stats(featuregroup, featuregroup_version=1, featurestore=None, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, num_clusters=5, corr_method='pearson')

Updates the statistics of a featuregroup by computing the statistics with spark and then saving it to Hopsworks by making a REST call.

Example usage:

>>> # The API will default to the project's featurestore, featuregroup version 1, and compute all statistics
>>> # for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                       featurestore=featurestore.project_featurestore(),
>>>                                       descriptive_statistics=True,feature_correlation=True,
>>>                                       feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for
>>> # example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1,
>>>                                        featurestore=featurestore.project_featurestore(),
>>>                                        descriptive_statistics=True, feature_correlation=True,
>>>                                        feature_histograms=True, cluster_analysis=True,
>>>                                        stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
featuregroup:the featuregroup to update the statistics for
featuregroup_version:
 the version of the featuregroup (defaults to 1)
featurestore:the featurestore where the featuregroup resides (defaults to the project’s featurestore)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use in clustering analysis (k-means)
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.update_training_dataset_stats(training_dataset, training_dataset_version=1, featurestore=None, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, num_clusters=5, corr_method='pearson')

Updates the statistics of a featuregroup by computing the statistics with spark and then saving it to Hopsworks by making a REST call.

Example usage:

>>> # The API will default to the project's featurestore, training dataset version 1, and compute all statistics
>>> # for all columns
>>> featurestore.update_training_dataset_stats("teams_prediction")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1,
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            descriptive_statistics=True,feature_correlation=True,
>>>                                            feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns
>>> # for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1,
>>>                                            featurestore=featurestore.project_featurestore(),
>>>                                            descriptive_statistics=True, feature_correlation=True,
>>>                                            feature_histograms=True, cluster_analysis=True,
>>>                                            stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
training_dataset:
 the training dataset to update the statistics for
training_dataset_version:
 the version of the training dataset (defaults to 1)
featurestore:the featurestore where the training dataset resides (defaults to the project’s featurestore)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use in clustering analysis (k-means)
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.visualize_featuregroup_clusters(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=(16, 12), plot=True)

Visualizes the feature clusters (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_clusters("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_clusters("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  figsize=(16,12),
>>>                                                  plot=True)
Args:
featuregroup_name:
 the name of the featuregroup
featurestore:the featurestore where the featuregroup resides
featuregroup_version:
 the version of the featuregroup
figsize:the size of the figure
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature clusters
hops.featurestore.visualize_featuregroup_correlations(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=(16, 12), cmap='coolwarm', annot=True, fmt='.2f', linewidths=0.05, plot=True)

Visualizes the feature correlations (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_correlations("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_correlations("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  cmap="coolwarm",
>>>                                                  figsize=(16,12),
>>>                                                  annot=True,
>>>                                                  fmt=".2f",
>>>                                                  linewidths=.05
>>>                                                  plot=True)
Args:
featuregroup_name:
 the name of the featuregroup
featurestore:the featurestore where the featuregroup resides
featuregroup_version:
 the version of the featuregroup
figsize:the size of the figure
cmap:the color map
annot:whether to annotate the heatmap
fmt:how to format the annotations
linewidths:line width in the plot
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature correlations
hops.featurestore.visualize_featuregroup_descriptive_stats(featuregroup_name, featurestore=None, featuregroup_version=1)

Visualizes the descriptive stats (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_descriptive_stats("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_descriptive_stats("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1)
Args:
featuregroup_name:
 the name of the featuregroup
featurestore:the featurestore where the featuregroup resides
featuregroup_version:
 the version of the featuregroup
Returns:
A pandas dataframe with the descriptive statistics
Raises:
FeatureVisualizationError:
 if there was an error in fetching the descriptive statistics
hops.featurestore.visualize_featuregroup_distributions(featuregroup_name, featurestore=None, featuregroup_version=1, figsize=None, color='lightblue', log=False, align='center', plot=True)

Visualizes the feature distributions (if they have been computed) for a featuregroup in the featurestore

Example usage:

>>> featurestore.visualize_featuregroup_distributions("trx_summary_features")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_featuregroup_distributions("trx_summary_features",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  featuregroup_version = 1,
>>>                                                  color="lightblue",
>>>                                                  figsize=None,
>>>                                                  log=False,
>>>                                                  align="center",
>>>                                                  plot=True)
Args:
featuregroup_name:
 the name of the featuregroup
featurestore:the featurestore where the featuregroup resides
featuregroup_version:
 the version of the featuregroup
figsize:size of the figure. If None, gets automatically set
color:the color of the histograms
log:whether to use log-scaling on the y-axis or not
align:how to align the bars, defaults to center.
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature distributions
hops.featurestore.visualize_training_dataset_clusters(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=(16, 12), plot=True)

Visualizes the feature clusters (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_clusters("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_clusters("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  figsize=(16,12),
>>>                                                  plot=True)
Args:
training_dataset_name:
 the name of the training dataset
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
figsize:the size of the figure
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature clusters
hops.featurestore.visualize_training_dataset_correlations(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=(16, 12), cmap='coolwarm', annot=True, fmt='.2f', linewidths=0.05, plot=True)

Visualizes the feature distributions (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_correlations("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_correlations("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  cmap="coolwarm",
>>>                                                  figsize=(16,12),
>>>                                                  annot=True,
>>>                                                  fmt=".2f",
>>>                                                  linewidths=.05
>>>                                                  plot=True)
Args:
training_dataset_name:
 the name of the training dataset
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
figsize:the size of the figure
cmap:the color map
annot:whether to annotate the heatmap
fmt:how to format the annotations
linewidths:line width in the plot
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature correlations
hops.featurestore.visualize_training_dataset_descriptive_stats(training_dataset_name, featurestore=None, training_dataset_version=1)

Visualizes the descriptive stats (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_descriptive_stats("AML_dataset")
>>> # You can also explicitly define version and featurestore
>>> featurestore.visualize_training_dataset_descriptive_stats("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1)
Args:
training_dataset_name:
 the name of the training dataset
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
Returns:
A pandas dataframe with the descriptive statistics
Raises:
FeatureVisualizationError:
 if there was an error in fetching the descriptive statistics
hops.featurestore.visualize_training_dataset_distributions(training_dataset_name, featurestore=None, training_dataset_version=1, figsize=(16, 12), color='lightblue', log=False, align='center', plot=True)

Visualizes the feature distributions (if they have been computed) for a training dataset in the featurestore

Example usage:

>>> featurestore.visualize_training_dataset_distributions("AML_dataset")
>>> # You can also explicitly define version, featurestore and plotting options
>>> featurestore.visualize_training_dataset_distributions("AML_dataset",
>>>                                                  featurestore=featurestore.project_featurestore(),
>>>                                                  training_dataset_version = 1,
>>>                                                  color="lightblue",
>>>                                                  figsize=(16,12),
>>>                                                  log=False,
>>>                                                  align="center",
>>>                                                  plot=True)
Args:
training_dataset_name:
 the name of the training dataset
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
figsize:size of the figure
figsize:the size of the figure
color:the color of the histograms
log:whether to use log-scaling on the y-axis or not
align:how to align the bars, defaults to center.
plot:if set to True it will plot the image and return None, if set to False it will not plot it but rather return the figure
Returns:
if the ‘plot’ flag is set to True it will plot the image and return None, if the ‘plot’ flag is set to False it will not plot it but rather return the figure
Raises:
FeatureVisualizationError:
 if there was an error visualizing the feature distributions

hops.hdfs module

API for interacting with the file system on Hops (HopsFS).

It is a wrapper around pydoop together with utility functions that are Hops-specific.

hops.hdfs.abs_path(hdfs_path)

Return an absolute path for hdfs_path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
Return an absolute path for hdfs_path.
hops.hdfs.access(hdfs_path, mode, project=None)

Perform the equivalent of os.access() on path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if access is allowed, False if not.
hops.hdfs.add_module(hdfs_path, project=None)

Add a .py or .ipynb file from HDFS to sys.path

For example, if you execute:

>>> add_module("Resources/my_module.py")
>>> add_module("Resources/my_notebook.ipynb")

You can import it simply as:

>>> import my_module
>>> import my_notebook
Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS) to a .py or .ipynb file
Returns:
Return full local path to localized python file or converted python file in case of .ipynb file
hops.hdfs.capacity()

Returns the raw capacity of the filesystem

Returns:
filesystem capacity (int)
hops.hdfs.chmod(hdfs_path, mode, project=None)

Change file mode bits.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.chown(hdfs_path, user, group, project=None)

Change file owner and group.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to the given project path in HDFS).
user:New hdfs username
group:New hdfs group
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.close()

Closes an the HDFS connection (disconnects to the namenode)

hops.hdfs.copy_to_hdfs(local_path, relative_hdfs_path, overwrite=False, project=None)

Copies a path from local filesystem to HDFS project (recursively) using relative path in $CWD to a path in hdfs (hdfs_path)

For example, if you execute:

>>> copy_to_hdfs("data.tfrecords", "/Resources", project="demo")

This will copy the file data.tfrecords to hdfs://Projects/demo/Resources/data.tfrecords

Args:
local_path:Absolute or local path on the local filesystem to copy
relative_hdfs_path:
 a path in HDFS relative to the project root to where the local path should be written
overwrite:a boolean flag whether to overwrite if the path already exists in HDFS
project:name of the project, defaults to the current HDFS user’s project
hops.hdfs.copy_to_local(hdfs_path, local_path='', overwrite=False, project=None)

Copies a directory or file from a HDFS project to a local private scratch directory. If there is not enough space on the local scratch directory, an exception is thrown. If the local file exists, and the hdfs file and the local file are the same size in bytes, return ‘ok’ immediately. If the local directory tree exists, and the hdfs subdirectory and the local subdirectory have the same files and directories, return ‘ok’ immediately.

For example, if you execute:

>>> copy_to_local("Resources/my_data")

This will copy the directory my_data from the Resources dataset in your project to the current working directory on the path ./my_data

Raises:
IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)
Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
local_path:the relative or full path to a directory on the local filesystem to copy to (relative to a scratch directory $PDIR), defaults to $CWD
overwrite:a boolean flag whether to overwrite if the path already exists in the local scratch directory.
project:name of the project, defaults to the current HDFS user’s project
Returns:
the full local pathname of the file/dir
hops.hdfs.cp(src_hdfs_path, dest_hdfs_path, overwrite=False)

Copy the contents of src_hdfs_path to dest_hdfs_path.

If src_hdfs_path is a directory, its contents will be copied recursively. Source file(s) are opened for reading and copies are opened for writing.

Args:
src_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
overwrite:boolean flag whether to overwrite destination path or not.
hops.hdfs.delete(hdfs_path, recursive=False)

Deletes path, path can be absolute or relative. If recursive is set to True and path is a directory, then files will be deleted recursively.

For example

>>> delete("/Resources/", recursive=True)

will delete all files recursively in the folder “Resources” inside the current project.

Args:
hdfs_path:the path to delete (project-relative or absolute)
Returns:
None
Raises:
IOError when recursive is False and directory is non-empty
hops.hdfs.dump(data, hdfs_path)

Dumps data to a file

Args:
data:data to write to hdfs_path
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.exists(hdfs_path, project=None)

Return True if hdfs_path exists in the default HDFS.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if hdfs_path exists.

Raises: IOError

hops.hdfs.get()

Get a handle to pydoop hdfs using the default namenode (specified in hadoop config)

Returns:
Pydoop hdfs handle
hops.hdfs.get_fs()

Get a handle to pydoop fs using the default namenode (specified in hadoop config)

Returns:
Pydoop fs handle
hops.hdfs.get_plain_path(abs_path)

Convert absolute HDFS path to plain path (dropping hdfs:// and ip)

Example use-case:

>>> hdfs.get_plain_path("hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Models/")
>>> # returns: "/Projects/demo_deep_learning_admin000/Models/"
Args:
abs_path:the absolute HDFS path containing hdfs:// and/or ip
Returns:
the plain path without hdfs:// and ip
hops.hdfs.get_webhdfs_host()

Reads the ipc.server.ssl.enabled property from core-site.xml and returns webhdfs hostname.

Returns:
returns the webhdfs hostname.
hops.hdfs.get_webhdfs_port()

Reads the ipc.server.ssl.enabled property from core-site.xml and the webhdfs port.

Returns:
returns the webhdfs port.
hops.hdfs.glob(hdfs_path, recursive=False, project=None)

Finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order.

Globbing gives you the list of files in a dir that matches a supplied pattern

>>> glob('Resources/*.json')
>>> ['Resources/1.json', 'Resources/2.json']

glob is implemented as os.listdir() and fnmatch.fnmatch() We implement glob as hdfs.ls() and fnmatch.filter()

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Raises:
IOError if the supplied hdfs path does not exist
Returns:
A possibly-empty list of path names that match pathname, which must be a string containing a path specification. pathname can be either absolute
hops.hdfs.is_tls_enabled()

Reads the ipc.server.ssl.enabled property from core-site.xml.

Returns:
returns True if ipc.server.ssl.enabled is true. False otherwise.
hops.hdfs.isdir(hdfs_path, project=None)

Return True if path refers to a directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a file.

Raises: IOError

hops.hdfs.isfile(hdfs_path, project=None)

Return True if path refers to a file.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a file.

Raises: IOError

hops.hdfs.load(hdfs_path)

Read the content of hdfs_path and return it.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
the read contents of hdfs_path
hops.hdfs.log(string)

Logs a string to the log file

Args:
string:string to log
hops.hdfs.ls(hdfs_path, recursive=False, exclude_nn_addr=False)

lists a directory in HDFS

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths
hops.hdfs.lsl(hdfs_path, recursive=False, project=None)

Returns all the pathnames in the supplied directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
recursive:if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Returns:
A possibly-empty list of path names stored in the supplied path.
hops.hdfs.mkdir(hdfs_path, project=None)

Create a directory and its parents as needed.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.move(src, dest)

Move or rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.open_file(hdfs_path, project=None, flags='rw', buff_size=0)

Opens an HDFS file for read/write/append and returns a file descriptor object (fd) that should be closed when no longer needed.

Args:
hdfs_path: you can specify either a full hdfs pathname or a relative one (relative to your project’s path in HDFS) flags: supported opening modes are ‘r’, ‘w’, ‘a’. In addition, a trailing ‘t’ can be added to specify text mode (e.g, ‘rt’ = open for reading text) buff_size: Pass 0 as buff_size if you want to use the “configured” values, i.e the ones set in the Hadoop configuration files.
Returns:
A file descriptor (fd) that needs to be closed (fd-close()) when it is no longer needed.
Raises:
IOError: If the file does not exist.
hops.hdfs.project_id()

Get the Hopsworks project id from environment variables

Returns: the Hopsworks project id

hops.hdfs.project_name()

Extracts the project name from the project username (“project__user”) or from the environment if available

Returns:
project name
hops.hdfs.project_path(project=None, exclude_nn_addr=False)

Get the path in HopsFS where the HopsWorks project is located. To point to a particular dataset, this path should be appended with the name of your dataset.

>>> from hops import hdfs
>>> project_path = hdfs.project_path()
>>> print("Project path: {}".format(project_path))
Args:
project_name:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
returns the project absolute path
hops.hdfs.project_user()

Gets the project username (“project__user”) from environment variables

Returns:
the project username
hops.hdfs.rename(src, dest)

Rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.rmr(hdfs_path, project=None)

Recursively remove files and directories.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.stat(hdfs_path)

Performs the equivalent of os.stat() on hdfs_path, returning a StatResult object.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths

hops.hive module

hops.hive.setup_hive_connection()

export enviroment variable with Hive connection configuration so it can be used by ipython-sql and PyHive to establish a connection with Hive

hops.jobs module

Utility functions to manage jobs in Hopsworks.

hops.jobs.create_job(name, job_config)

Create a job in Hopsworks

Args:
name: Name of the job to be created. job_config: A dictionary representing the job configuration
Returns:
HTTP(S)Connection
hops.jobs.get_current_execution(name)
hops.jobs.start_job(name, action='start')

Start an execution of the job. Only one execution can be active for a job.

Returns:
The job status.
hops.jobs.stop_job(name, action='stop')

Stop the current execution of the job. Returns:

The job status.

hops.kafka module

A module for setting up Kafka Brokers and Consumers on the Hops platform. It hides the complexity of configuring Kafka by providing utility methods such as:

  • get_broker_endpoints().
  • get_security_protocol().
  • get_kafka_default_config().
  • etc.

Using these utility functions you can setup Kafka with the Kafka client-library of your choice, e.g SparkStreaming or confluent-kafka-python. For example, assuming that you have created a topic called “test” on Hopsworks and that you have installed confluent-kafka-python inside your project’s anaconda environment:

>>> from hops import kafka
>>> from confluent_kafka import Producer, Consumer
>>> TOPIC_NAME = "test"
>>> config = kafka.get_kafka_default_config()
>>> producer = Producer(config)
>>> consumer = Consumer(config)
>>> consumer.subscribe(["test"])
>>> # wait a little while before executing the rest of the code (put it in a different Jupyter cell)
>>> # so that the consumer get chance to subscribe (asynchronous call)
>>> for i in range(0, 10):
>>> producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
>>> # Trigger the sending of all messages to the brokers, 10sec timeout
>>> producer.flush(10)
>>> for i in range(0, 10):
>>> msg = consumer.poll(timeout=5.0)
>>> if msg is not None:
>>>     print('Consumed Message: {} from topic: {}'.format(msg.value(), msg.topic()))
>>> else:
>>>     print("Topic empty, timeout when trying to consume message")

Similarly, you can define a pyspark kafka consumer as follows, using the spark session defined in variable spark

>>> from hops import kafka
>>> from hops import tls
>>> TOPIC_NAME = "test"
>>> df = spark \.format("kafka")
>>> .option("kafka.bootstrap.servers", kafka.get_broker_endpoints())
>>> .option("kafka.ssl.truststore.location", tls.get_trust_store())
>>> .option("kafka.ssl.truststore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.keystore.location", tls.get_key_store())
>>> .option("kafka.ssl.keystore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.key.password", tls.get_trust_store_pwd())
>>> .option("subscribe", TOPIC_NAME)
>>> .load()
class hops.kafka.KafkaTopicDTO(kafka_topic_dto_json)

Bases: object

Represents a KafkaTopic in Hopsworks

__init__(kafka_topic_dto_json)

Initialize the kafka topic from JSON payload returned by Hopsworks REST API

Args:
kafka_topic_dto_json:
 JSON data about the kafka topic returned from Hopsworks REST API
hops.kafka.convert_json_schema_to_avro(json_schema)

Parses a JSON kafka topic schema returned by Hopsworks REST API into an avro schema

Args:
json_schema:the json schema to convert
Returns:
the avro schema
hops.kafka.get_broker_endpoints()

Get Kafka broker endpoints as a string with broker-endpoints “,” separated

Returns:
a string with broker endpoints comma-separated
hops.kafka.get_broker_endpoints_list()

Get Kafka broker endpoints as a list

Returns:
a list with broker endpoint strings
hops.kafka.get_kafka_default_config()

Gets a default configuration for running secure Kafka on Hops

Returns:
dict with config_property –> value
hops.kafka.get_schema(topic)

Gets the Avro schema for a particular Kafka topic.

Args:
topic:Kafka topic name
Returns:
Avro schema as a string object in JSON format
hops.kafka.get_security_protocol()

Gets the security protocol used for communicating with Kafka brokers in a Hopsworks cluster

Returns:
the security protocol for communicating with Kafka brokers in a Hopsworks cluster
hops.kafka.parse_avro_msg(msg, avro_schema)

Parses an avro record using a specified avro schema

Args:
msg:the avro message to parse
avro_schema:the avro schema
Returns:
The parsed/decoded message

hops.numpy_helper module

API for reading/writing numpy arrays to/from HDFS

hops.numpy_helper.load(hdfs_filename, **kwds)

Reads a file from HDFS into a Numpy Array

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
**kwds:You can add any additional args found in numpy.read(…)
Returns:
A numpy array
Raises:
IOError: If the file does not exist
hops.numpy_helper.save(hdfs_filename, data)

Saves a numpy array to a file in HDFS

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
data:numpy array
Raises:
IOError: If the local file does not exist

hops.pandas_helper module

API for opening csv files into Pandas from HDFS

hops.pandas_helper.read_csv(hdfs_filename, **kwds)

Reads a comma-separated values (csv) file from HDFS into a Pandas DataFrame

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
**kwds:You can add any additional args found in pandas.read_csv(…)
Returns:
A pandas dataframe
Raises:
IOError: If the file does not exist
hops.pandas_helper.write_csv(hdfs_filename, dataframe, **kwds)

Writes a pandas dataframe to a comma-separated values (csv) text file in HDFS. Overwrites the file if it already exists

Args:
hdfs_filename:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS)
dataframe:a Pandas dataframe
**kwds:You can add any additional args found in pandas.read_csv(…)
Raises:
IOError: If the file does not exist

hops.serving module

Utility functions to export models to the Models dataset and get information about models currently being served in the project.

class hops.serving.Serving(serving_json)

Bases: object

Represents a model being served in Hopsworks

__init__(serving_json)

Initialize the serving from JSON payload returned by Hopsworks REST API

Args:
feature_json:JSON data about the feature returned from Hopsworks REST API
exception hops.serving.ServingNotFound

Bases: exceptions.Exception

This exception will be raised if the requested serving could not be found

hops.serving.create_or_update(artifact_path, serving_name, serving_type='TENSORFLOW', model_version=1, batching_enabled=False, topic_name='CREATE', num_partitions=1, num_replicas=1, instances=1, update=False)

Creates or updates a serving in Hopsworks

Example use-case:

>>> from hops import serving
>>> serving.create_or_update("/Models/mnist", "mnist", "TENSORFLOW", 1)
Args:
artifact_path:path to the artifact to serve (tf model dir or sklearn script)
serving_name:name of the serving to create
serving_type:type of the serving, e.g “TENSORFLOW” or “SKLEARN”
model_version:version of the model to serve
batching_enabled:
 boolean flag whether to enable batching for the inference requests
update:boolean flag whether to update existing serving, otherwise it will try to create a new serving
instances:the number of serving instances (the more instances the more inference requests can

be served in parallel)

Returns:
None
hops.serving.delete(serving_name)

Deletes serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.delete("irisFlowerClassifier")
Args:
serving_name:name of the serving to delete
Returns:
None
hops.serving.exists(serving_name)

Checks if there exists a serving with the given name

Example use-case:

>>> from hops import serving
>>> serving.exist(serving_name)
Args:
serving_name:the name of the serving
Returns:
True if the serving exists, otherwise false
hops.serving.export(model_path, model_name, model_version=1, overwrite=False)

Copies a trained model to the Models directory in the project and creates the directory structure of:

>>> Models
>>>      |
>>>      - model_name
>>>                 |
>>>                 - version_x
>>>                 |
>>>                 - version_y

For example if you run this:

>>> serving.export("iris_knn.pkl", "irisFlowerClassifier", 1, overwrite=True)

it will copy the local model file “iris_knn.pkl” to /Projects/projectname/Models/irisFlowerClassifier/1/iris.knn.pkl on HDFS, and overwrite in case there already exists a file with the same name in the directory.

If you run:

>>> serving.export("Resources/iris_knn.pkl", "irisFlowerClassifier", 1, overwrite=True)

it will first check if the path Resources/iris_knn.pkl exists on your local filesystem in the current working directory. If the path was not found, it will check in your project’s HDFS directory and if it finds the model there it will copy it to /Projects/projectname/Models/irisFlowerClassifier/1/iris.knn.pkl

If “model” is a directory on the local path exported by tensorflow, and you run:

:
>>> serving.export("/model/", "mnist", 1, overwrite=True)

It will copy the model directory contents to /Projects/projectname/Models/mnist/1/ , e.g the “model.pb” file and the “variables” directory.

Args:
model_path:path to the trained model (HDFS or local)
model_name:name of the model/serving
model_version:version of the model/serving
overwrite:boolean flag whether to overwrite in case a serving already exists in the exported directory
Returns:
The path to where the model was exported
Raises:
ValueError:if there was an error with the exportation of the model due to invalid user input
hops.serving.get_all()

Gets the list of servings for the current project

Example:

>>> from hops import serving
>>> servings = serving.get_all()
>>> servings[0].name
Returns:
list of servings
hops.serving.get_artifact_path(serving_name)

Gets the artifact path of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_artifact_path(serving_name)
Args:
serving_name:name of the serving to get the artifact path for
Returns:
the artifact path of the serving (model path in case of tensorflow, or python script in case of SkLearn)
hops.serving.get_id(serving_name)

Gets the id of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_id(serving_name)
Args:
serving_name:name of the serving to get the id for
Returns:
the id of the serving
hops.serving.get_kafka_topic(serving_name)

Gets the kafka topic name of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_kafka_topic(serving_name)
Args:
serving_name:name of the serving to get the kafka topic name for
Returns:
the kafka topic name of the serving
hops.serving.get_status(serving_name)

Gets the status of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_status(serving_name)
Args:
serving_name:name of the serving to get the status for
Returns:
the status of the serving
hops.serving.get_type(serving_name)

Gets the type of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_type(serving_name)
Args:
serving_name:name of the serving to get the typ for
Returns:
the type of the serving (e.g Tensorflow or SkLearn)
hops.serving.get_version(serving_name)

Gets the version of a serving with a given name

Example use-case:

>>> from hops import serving
>>> serving.get_version(serving_name)
Args:
serving_name:name of the serving to get the version for
Returns:
the version of the serving
hops.serving.make_inference_request(serving_name, data, verb=':predict')

Submit an inference request

Example use-case:

>>> from hops import serving
>>> serving.make_inference_request("irisFlowerClassifier", [[1,2,3,4]], ":predict")
Args:
serving_name:name of the model being served
data:data/json to send to the serving
verb:type of request (:predict, :classify, or :regress)
Returns:
the JSON response
hops.serving.start(serving_name)

Starts a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.start("irisFlowerClassifier")
Args:
serving_name:name of the serving to start
Returns:
None
hops.serving.stop(serving_name)

Stops a model serving instance with a given name

Example use-case:

>>> from hops import serving
>>> serving.stop("irisFlowerClassifier")
Args:
serving_name:name of the serving to stop
Returns:
None

hops.tensorboard module

Utility functions to manage the lifecycle of TensorBoard and get the path to write TensorBoard events.

hops.tensorboard.interactive_debugger()

Returns: address for interactive debugger in TensorBoard

hops.tensorboard.logdir()

Get the TensorBoard logdir. This function should be called in your wrapper function for Experiment, Parallel Experiment or Distributed Training and passed as the logdir for TensorBoard.

Case 1: local_logdir=True, then the logdir is on the local filesystem, otherwise it is in the folder for your experiment in your project in HDFS. Once the experiment is finished all the files that are present in the directory will be uploaded to tour experiment directory in the Experiments dataset.

Case 2: local_logdir=False, then the logdir is in HDFS in your experiment directory in the Experiments dataset.

Returns:
The path to store files for your experiment. The content is also visualized in TensorBoard.
hops.tensorboard.non_interactive_debugger()

Returns: address for non-interactive debugger in TensorBoard

hops.tensorboard.visualize(hdfs_root_logdir)

Visualize all TensorBoard events for a given path in HopsFS. This is intended for use after running TensorFlow jobs to visualize them all in the same TensorBoard. tflauncher.launch returns the path in HopsFS which should be handed as argument for this method to visualize all runs.

Args:
hdfs_root_logdir:
 the path in HopsFS to enter as the logdir for TensorBoard

hops.tls module

A module for getting TLS certificates in YARN containers, used for setting up Kafka inside Jobs/Notebooks on Hops.

hops.tls.get_ca_chain_location()

Get location of chain of CA certificates (PEM format) that are required to validate the private key certificate of the client used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to ca chain of certificate
hops.tls.get_client_certificate_location()

Get location of client certificate (PEM format) for the private key signed by trusted CA used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client certificate in PEM format
hops.tls.get_client_key_location()

Get location of client private key (PEM format) used for for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client private key in PEM format
hops.tls.get_key_store()
hops.tls.get_key_store_cert()

Get keystore certificate from local container

Returns:
Certificate password
hops.tls.get_key_store_pwd()

Get keystore password

Returns:
keystore password
hops.tls.get_trust_store()
hops.tls.get_trust_store_pwd()

Get truststore password

Returns:
truststore password

hops.util module

Miscellaneous utility functions for user applications.

hops.util.abspath(hdfs_path)
hops.util.get_api_key_aws(project_name)

Returns the Flink configuration directory.

Returns:
The Flink config dir path.
hops.util.get_job_name()

If this method is called from inside a hopsworks job, it returns the name of the job.

Returns:
the name of the hopsworks job
hops.util.get_jwt()

Retrieves jwt from local container.

Returns:
Content of jwt.token file in local container.
hops.util.get_redshift_username_password(region_name, cluster_identifier, user, database)

Requests temporary Redshift credentials with a validity of 3600 seconds and the given parameters.

Args:
region_name:the AWS region name
cluster_identifier:
 the Redshift cluster identifier
user:the Redshift user to get credentials for
database:the Redshift database
Returns:
user, password
hops.util.get_requests_verify(hostname, port)

Get verification method for sending HTTP requests to Hopsworks. Credit to https://gist.github.com/gdamjan/55a8b9eec6cf7b771f92021d93b87b2c Returns:

if env var HOPS_UTIL_VERIFY is not false
then if hopsworks certificate is self-signed, return the path to the truststore (PEM) else if hopsworks is not self-signed, return true

return false

hops.util.grid_params(dict)

Generate all possible combinations (cartesian product) of the hyperparameter values

Args:
dict:
Returns:
A new dictionary with a grid of all the possible hyperparameter combinations
hops.util.num_executors()

Get the number of executors configured for Jupyter

Returns:
Number of configured executors for Jupyter
hops.util.num_param_servers()

Get the number of parameter servers configured for Jupyter

Returns:
Number of configured parameter servers for Jupyter
hops.util.parse_redhift_jdbc_url(url)

Parses a Redshift JDBC URL and extracts region_name, cluster_identifier, database and user.

Args:
url:the JDBC URL
Returns:
region_name, cluster_identifier, database, user
hops.util.send_request(method, resource, data=None, headers=None)

Sends a request to Hopsworks. In case of Unauthorized response, submit the request once more as jwt might not have been read properly from local container. Args:

method: HTTP(S) method resource: Hopsworks resource data: HTTP(S) payload headers: HTTP(S) headers verify: Whether to verify the https request
Returns:
HTTP(S) response
hops.util.set_auth_header(headers)

Set authorization header for HTTP requests to Hopsworks, depending if setup is remote or not.

Args:
http headers

Module contents