hops package

Submodules

hops.devices module

Utility functions to retrieve information about available devices in the environment.

hops.devices.get_num_gpus()

Get the number of GPUs available in the environment and consequently by the application

Assuming there is one GPU in the environment

>>> from hops import devices
>>> devices.get_num_gpus()
>>> 1
Returns:
Number of GPUs available in the environment

hops.experiment module

Experiment module used for running Experiments, Parallel Experiments and Distributed Training on Hopsworks.

The programming model is that you wrap the code to run inside a wrapper function. Inside that wrapper function provide all imports and parts that make up your experiment, see examples below. Whenever a function to run an experiment is invoked it is also registered in the Experiments service along with the provided information.

Three different types of experiments
  • Run a single standalone Experiment using the launch function.
  • Run Parallel Experiments performing hyperparameter optimization using grid_search or differential_evolution.
  • Run single or multi-machine Distributed Training using parameter_server or collective_all_reduce.
hops.experiment.begin(name='no-name', local_logdir=False, versioned_resources=None, description=None)

Start a custom Experiment, at the end of the experiment call end(metric).

IMPORTANT - This call should not be combined with other functions in the experiment module, other than end. Other experiment functions such as grid_search manages the begin and end functions internally

Example usage:

>>> from hops import experiment
>>> experiment.begin(name='calculate pi')
>>> # Code to calculate pi
>>> pi = calc_pi()
>>> experiment.end(pi)
Args:
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.collective_all_reduce(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training

Sets up the cluster to run CollectiveAllReduceStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...CollectiveAllReduceStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.collective_all_reduce(distributed_training, local_logdir=True)
Args:
map_fun:the function containing code to run CollectiveAllReduceStrategy
name:the name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.differential_evolution(objective_function, boundary_dict, direction='max', generations=10, population=10, mutation=0.5, crossover=0.7, cleanup_generations=False, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Parallel Experiment

Run differential evolution to explore a given search space for each hyperparameter and figure out the best hyperparameter combination. The function is treated as a blackbox that returns a metric for some given hyperparameter combination. The returned metric is used to evaluate how ‘good’ the hyperparameter combination was.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate':[0.01, 0.2], 'dropout': [0.1, 0.9]}
>>> def train_nn(learning_rate, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, dropout)
>>> experiment.differential_evolution(train_nn, boundary_dict, direction='max')
Args:
objective_function:
 the function to run, must return a metric
boundary_dict:a dict where each key corresponds to an argument of objective_function and the correspond value should be a list of two elements. The first element being the lower bound for the parameter and the the second element the upper bound.
direction:‘max’ to maximize the returned metric, ‘min’ to minize the returned metric
generations:number of generations
population:size of population
mutation:mutation rate to explore more different hyperparameters
crossover:how fast to adapt the population to the best in each generation
cleanup_generations:
 remove previous generations from HDFS, only keep the last 2
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored, dict with best hyperparameters
hops.experiment.end(metric=None)

End a custom Experiment previously registered with begin and register a metric to associate with it.

Args:
metric:The metric to associate with the Experiment

Parallel Experiment

Run multiple experiments and test a grid of hyperparameters for a neural network to maximize e.g. a Neural Network’s accuracy.

The following example will run train_nn with 6 different hyperparameter combinations

>>> from hops import experiment
>>> grid_dict = {'learning_rate':[0.1, 0.3], 'dropout': [0.4, 0.6, 0.1]}
>>> def train_nn(learning_rate, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, dropout)
>>> experiment.grid_search(train_nn, grid_dict, direction='max')

The following values will be injected in the function and run and evaluated.

  • (learning_rate=0.1, dropout=0.4)
  • (learning_rate=0.1, dropout=0.6)
  • (learning_rate=0.1, dropout=0.1)
  • (learning_rate=0.3, dropout=0.4)
  • (learning_rate=0.3, dropout=0.6)
  • (learning_rate=0.3, dropout=0.1)
Args:
map_fun:the function to run, must return a metric
args_dict:a dict with a key for each argument with a corresponding value being a list containing the hyperparameters to test, internally all possible combinations will be generated and run as separate Experiments
direction:‘max’ to maximize the returned metric, ‘min’ to minize the returned metric
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.launch(map_fun, args_dict=None, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Experiment or Parallel Experiment

Run an Experiment contained in map_fun one time with no arguments or multiple times with different arguments if args_dict is specified.

Example usage:

>>> from hops import experiment
>>> def train_nn():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    logdir = tensorboard.logdir()
>>>    # code for preprocessing, training and exporting model
>>>    # optionally return a value for the experiment which is registered in Experiments service
>>> experiment.launch(train_nn)
Args:
map_fun:The function to run
args_dict:If specified will run the same function multiple times with different arguments, {‘a’:[1,2], ‘b’:[5,3]} would run the function two times with arguments (1,5) and (2,3) provided that the function signature contains two arguments like def func(a,b):
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.mirrored(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training single machine - multiple GPUs

Example usage:

>>> from hops import experiment
>>> def mirrored_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...MirroredStrategy()...
>>> experiment.mirrored(mirrored_training)
Args:
map_fun:contains the code where you are using MirroredStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored
hops.experiment.parameter_server(map_fun, name='no-name', local_logdir=False, versioned_resources=None, description=None)

Distributed Training

Sets up the cluster to run ParameterServerStrategy.

TF_CONFIG is exported in the background and does not need to be set by the user themselves.

Example usage:

>>> from hops import experiment
>>> def distributed_training():
>>>    import tensorflow
>>>    from hops import tensorboard
>>>    from hops import devices
>>>    logdir = tensorboard.logdir()
>>>    ...ParameterServerStrategy(num_gpus_per_worker=devices.get_num_gpus())...
>>> experiment.parameter_server(distributed_training, local_logdir=True)
Args:
map_fun:contains the code where you are using ParameterServerStrategy.
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:a longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored

Parallel Experiment

Run an Experiment contained in map_fun for configured number of random samples controlled by the samples parameter. Each hyperparameter is contained in boundary_dict with the key corresponding to the name of the hyperparameter and a list containing two elements defining the lower and upper bound. The experiment must return a metric corresponding to how ‘good’ the given hyperparameter combination is.

Example usage:

>>> from hops import experiment
>>> boundary_dict = {'learning_rate': [0.1, 0.3], 'layers': [2, 9], 'dropout': [0.1,0.9]}
>>> def train_nn(learning_rate, layers, dropout):
>>>    import tensorflow
>>>    # code for preprocessing, training and exporting model
>>>    # mandatory return a value for the experiment which is registered in Experiments service
>>>    return network.evaluate(learning_rate, layers, dropout)
>>> experiment.random_search(train_nn, boundary_dict, samples=14, direction='max')
Args:
map_fun:The function to run
boundary_dict:dict containing hyperparameter name and corresponding boundaries, each experiment randomize a value in the boundary range.
direction:If set to ‘max’ the highest value returned will correspond to the best solution, if set to ‘min’ the opposite is true
samples:the number of random samples to evaluate for each hyperparameter given the boundaries
name:name of the experiment
local_logdir:True if tensorboard.logdir() should be in the local filesystem, otherwise it is in HDFS
versioned_resources:
 A list of HDFS paths of resources to version with this experiment
description:A longer description for the experiment
Returns:
HDFS path in your project where the experiment is stored

hops.facets module

Google Facets utility functions.

hops.facets.dive(jsonstr)

Display json string in HTML

Args:
jsonstr:json string
hops.facets.overview(train_data, test_data)

Calculate the feature statistics proto from the datasets and stringify it and display in facets overview

Args:
train_data:the train data
test_data:the test data

hops.featurestore module

A feature store client. This module exposes an API for interacting with feature stores in Hopsworks. It hides complexity and provides utility methods such as:

  • project_featurestore().
  • get_featuregroup().
  • get_feature().
  • get_features().
  • sql()
  • insert_into_featuregroup()
  • get_featurestore_metadata()
  • get_project_featurestores()
  • get_featuregroups()
  • get_training_datasets()

Below is some example usages of this API (assuming you have two featuregroups called ‘trx_graph_summary_features’ and ‘trx_summary_features’ with schemas:

|– cust_id: integer (nullable = true)

|– pagerank: float (nullable = true)

|– triangle_count: float (nullable = true)

and

|– cust_id: integer (nullable = true)

|– min_trx: float (nullable = true)

|– max_trx: float (nullable = true)

|– avg_trx: float (nullable = true)

|– count_trx: long (nullable = true)

, respectively.

>>> from hops import featurestore
>>> # Get feature group example
>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features", featurestore=featurestore.project_featurestore(), featuregroup_version = 1)
>>>
>>> # Get single feature example
>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx", featurestore=featurestore.project_featurestore(), featuregroup="trx_summary_features", featuregroup_version = 1)
>>> # When you want to get features from different feature groups the API will infer how to join the features together
>>>
>>> # Get list of features example
>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"], featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"], featurestore=featurestore.project_featurestore(), featuregroups_version_dict={"trx_graph_summary_features": 1, "trx_summary_features": 1}, join_key="cust_id")
>>>
>>> # Run SQL query against feature store example
>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5", featurestore=featurestore.project_featurestore()).show(5)
>>>
>>> # Insert into featuregroup example
>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features", featurestore=featurestore.project_featurestore(), featuregroup_version=1, mode="append", descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>>
>>> # Get featurestore metadata example
>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be specified with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore())
>>>
>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be specified with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
>>>
>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
>>> # By default `get_featurestore_metadata` will use the project's feature store, but this can also be specified with the optional argument featurestore
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
>>>
>>> # Compute featuergroup statistics (feature correlation, descriptive stats, feature distributions etc) with Spark that will show up in the Featurestore Registry in Hopsworks
>>> # The API will default to the project's featurestore, featuregroup version 1, and compute all statistics for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True,feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
>>>
>>> # Create featuregroup from an existing dataframe
>>> # In most cases it is recommended that featuregroups are created in the UI on Hopsworks and that care is taken in documenting the featuregroup.
>>> # However, sometimes it is practical to create a featuregroup directly from a spark dataframe and fill in the metadata about the featuregroup later in the UI.
>>> # This can be done through the create_featuregroup API function
>>>
>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for the new featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2", description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2", description="trx_summary_features without the column count_trx",featurestore=featurestore.project_featurestore(),featuregroup_version=1, job_name=None, dependencies=[], descriptive_statistics=False, feature_correlation=False, feature_histograms=False, cluster_analysis=False, stat_columns=None)
>>>
>>> # After you have found the features you need in the featurestore you can materialize the features into a training dataset
>>> # so that you can train a machine learning model using the features. Just as for featuregroups,
>>> # it is useful to version and document training datasets, for this reason HopsML supports managed training datasets which enables
>>> # you to easily version, document and automate the materialization of training datasets.
>>>
>>> # First we select the features (and/or labels) that we want
>>> dataset_df = featurestore.get_features(["pagerank", "triangle_count", "avg_trx", "count_trx", "max_trx", "min_trx","balance", "number_of_accounts"],featurestore=featurestore.project_featurestore())
>>> # Now we can create a training dataset from the dataframe with some extended metadata such as schema (automatically inferred).
>>> # By default when you create a training dataset it will be in "tfrecords" format and statistics will be computed for all features.
>>> # After the dataset have been created you can view and/or update the metadata about the training dataset from the Hopsworks featurestore UI
>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="", featurestore=featurestore.project_featurestore(), data_format="csv", training_dataset_version=1, job_name=None, dependencies=[], descriptive_statistics=False, feature_correlation=False, feature_histograms=False, cluster_analysis=False, stat_columns=None)
>>>
>>> # Once a dataset have been created, its metadata is browsable in the featurestore registry in the Hopsworks UI.
>>> # If you don't want to create a new training dataset but just overwrite or insert new data into an existing training dataset,
>>> # you can use the API function 'insert_into_training_dataset'
>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset", featurestore=featurestore.project_featurestore(), training_dataset_version=1, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>>
>>> # After a managed dataset have been created, it is easy to share it and re-use it for training various models.
>>> # For example if the dataset have been materialized in tf-records format you can call the method get_training_dataset_path(training_dataset)
>>> # to get the HDFS path and read it directly in your tensorflow code.
>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1, but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(), training_dataset_version=1)
hops.featurestore.create_featuregroup(df, featuregroup, primary_key=None, description='', featurestore=None, featuregroup_version=1, job_name=None, dependencies=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5)

Creates a new featuregroup from a dataframe of features (sends the metadata to Hopsworks with a REST call to create the Hive table and store the metadata and then inserts the data of the spark dataframe into the newly created table)

Example usage:

>>> # By default the new featuregroup will be created in the project's featurestore and the statistics for the new featuregroup will be computed based on the provided spark dataframe.
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2", description="trx_summary_features without the column count_trx")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.create_featuregroup(trx_summary_df1, "trx_summary_features_2_2", description="trx_summary_features without the column count_trx",featurestore=featurestore.project_featurestore(),featuregroup_version=1, job_name=None, dependencies=[], descriptive_statistics=False, feature_correlation=False, feature_histograms=False, cluster_analysis=False, stat_columns=None)
Args:
df:the dataframe to create the featuregroup for (used to infer the schema)
featuregroup:the name of the new featuregroup
primary_key:the primary key of the new featuregroup, if not specified, the first column in the dataframe will be used as primary
description:a description of the featuregroup
featurestore:the featurestore of the featuregroup (defaults to the project’s featurestore)
featuregroup_version:
 the version of the featuregroup (defaults to 1)
job_name:the name of the job to compute the featuregroup
dependencies:list of the datasets that this featuregroup depends on (e.g input datasets to the feature engineering job)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.create_training_dataset(df, training_dataset, description='', featurestore=None, data_format='tfrecords', training_dataset_version=1, job_name=None, dependencies=[], descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5)

Creates a new training dataset from a dataframe, saves metadata about the training dataset to the database and saves the materialized dataset on hdfs

Example usage:

>>> featurestore.create_training_dataset(dataset_df, "AML_dataset")
>>> # You can override the default configuration if necessary:
>>> featurestore.create_training_dataset(dataset_df, "TestDataset", description="", featurestore=featurestore.project_featurestore(), data_format="csv", training_dataset_version=1, job_name=None, dependencies=[], descriptive_statistics=False, feature_correlation=False, feature_histograms=False, cluster_analysis=False, stat_columns=None)
Args:
df:the dataframe to create the training dataset from
training_dataset:
 the name of the training dataset
description:a description of the training dataset
featurestore:the featurestore that the training dataset is linked to
data_format:the format of the materialized training dataset
training_dataset_version:
 the version of the training dataset (defaults to 1)
job_name:the name of the job to compute the training dataset
dependencies:list of the datasets that this training dataset depends on (e.g input datasets to the feature engineering job)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.get_dataframe_tf_record_schema(spark_df)

Infers the tf-record schema from a spark dataframe Note: this method is just for convenience, it should work in 99% of cases but it is not guaranteed, if spark or tensorflow introduces new datatypes this will break. The user can allways fallback to encoding the tf-example-schema manually.

Args:
spark_df:the spark dataframe to infer the tensorflow example record from
Returns:
a dict with the tensorflow example
hops.featurestore.get_feature(feature, featurestore=None, featuregroup=None, featuregroup_version=1, dataframe_type='spark')

Gets a particular feature (column) from a featurestore, if no featuregroup is specified it queries hopsworks metastore to see if the feature exists in any of the featuregroups in the featurestore. If the user knows which featuregroup contain the feature, it should be specified as it will improve performance of the query.

Example usage:

>>> #The API will infer the featuregroup and default to version 1 for the feature group with this and the project's own feature store
>>> max_trx_feature = featurestore.get_feature("max_trx")
>>> #You can also explicitly define feature group,version and feature store:
>>> max_trx_feature = featurestore.get_feature("max_trx", featurestore=featurestore.project_featurestore(), featuregroup="trx_summary_features", featuregroup_version = 1)
Args:
feature:the feature name to get
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroup:(Optional) the featuregroup where the feature resides
featuregroup_version:
 (Optional) the version of the featuregroup
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
A spark dataframe with the feature
hops.featurestore.get_featuregroup(featuregroup, featurestore=None, featuregroup_version=1, dataframe_type='spark')

Gets a featuregroup from a featurestore as a spark dataframe

Example usage:

>>> #The API will default to version 1 for the feature group and the project's own feature store
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features")
>>> #You can also explicitly define version and feature store:
>>> trx_summary_features = featurestore.get_featuregroup("trx_summary_features", featurestore=featurestore.project_featurestore(), featuregroup_version = 1)
Args:
featuregroup:the featuregroup to get
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroup_version:
 (Optional) the version of the featuregroup
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
a spark dataframe with the contents of the featurestore
hops.featurestore.get_featuregroups(featurestore=None)

Gets a list of all featuregroups in a featurestore

>>> # List all Feature Groups in a Feature Store
>>> featurestore.get_featuregroups()
>>> # By default `get_featuregroups()` will use the project's feature store, but this can also be specified with the optional argument `featurestore`
>>> featurestore.get_featuregroups(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to list featuregroups for, defaults to the project-featurestore
Returns:
A list of names of the featuregroups in this featurestore
hops.featurestore.get_features(features, featurestore=None, featuregroups_version_dict={}, join_key=None, dataframe_type='spark')

Gets a list of features (columns) from the featurestore. If no featuregroup is specified it will query hopsworks metastore to find where the features are stored.

Example usage:

>>> # The API will default to version 1 for feature groups and the project's feature store
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"], featurestore=featurestore.project_featurestore())
>>> #You can also explicitly define feature group, version, feature store, and join-key:
>>> features = featurestore.get_features(["pagerank", "triangle_count", "avg_trx"], featurestore=featurestore.project_featurestore(), featuregroups_version_dict={"trx_graph_summary_features": 1, "trx_summary_features": 1}, join_key="cust_id")
Args:
features:a list of features to get from the featurestore
featurestore:the featurestore where the featuregroup resides, defaults to the project’s featurestore
featuregroups:(Optional) a dict with (fg –> version) for all the featuregroups where the features resides
featuregroup_version:
 (Optional) the version of the featuregroup
join_key:(Optional) column name to join on
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
A spark dataframe with all the features
hops.featurestore.get_features_list(featurestore=None)

Gets a list of all features in a featurestore

>>> # List all Features in a Feature Store
>>> featurestore.get_features_list()
>>> # By default `get_features_list()` will use the project's feature store, but this can also be specified with the optional argument `featurestore`
>>> featurestore.get_features_list(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to list features for, defaults to the project-featurestore
Returns:
A list of names of the features in this featurestore
hops.featurestore.get_featurestore_metadata(featurestore=None)

Sends a REST call to Hopsworks to get the list of featuregroups and their features for the given featurestore.

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.get_featurestore_metadata()
>>> # You can also explicitly define the feature store
>>> featurestore.get_featurestore_metadata(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to query metadata of
Returns:
A list of featuregroups and their metadata
hops.featurestore.get_latest_featuregroup_version(featuregroup, featurestore=None)

Utility method to get the latest version of a particular featuregroup

Args:
featuregroup:the featuregroup to get the latest version of
featurestore:the featurestore where the featuregroup resides
Returns:
the latest version of the featuregroup in the feature store
hops.featurestore.get_latest_training_dataset_version(training_dataset, featurestore=None)

Utility method to get the latest version of a particular training dataset

Args:
training_dataset:
 the training dataset to get the latest version of
featurestore:the featurestore where the training dataset resides
Returns:
the latest version of the training dataset in the feature store
hops.featurestore.get_project_featurestores()

Gets all featurestores for the current project

Example usage:

>>> # Get list of featurestores accessible by the current project example
>>> featurestore.get_project_featurestores()
Returns:
A list of all featurestores that the project have access to
hops.featurestore.get_training_dataset(training_dataset, featurestore=None, training_dataset_version=1, dataframe_type='spark')

Reads a training dataset into a spark dataframe

Args:
training_dataset:
 the name of the training dataset to read
featurestore:the featurestore where the training dataset resides
training_dataset_version:
 the version of the training dataset
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
A spark dataframe with the given training dataset data
hops.featurestore.get_training_dataset_path(training_dataset, featurestore=None, training_dataset_version=1)

Gets the HDFS path to a training dataset with a specific name and version in a featurestore

Example usage:

>>> featurestore.get_training_dataset_path("AML_dataset")
>>> # By default the library will look for the training dataset in the project's featurestore and use version 1, but this can be overriden if required:
>>> featurestore.get_training_dataset_path("AML_dataset",  featurestore=featurestore.project_featurestore(), training_dataset_version=1)
Args:
training_dataset:
 name of the training dataset
featurestore:featurestore that the training dataset is linked to
training_dataset_version:
 version of the training dataset
Returns:
The HDFS path to the training dataset
hops.featurestore.get_training_dataset_tf_record_schema(training_dataset, training_dataset_version=1, featurestore=None)

Gets the tf record schema for a training dataset that is stored in tfrecords format

Args:
training_dataset:
 the training dataset to get the tfrecords schema for
training_dataset_version:
 the version of the training dataset
featurestore:the feature store where the training dataset resides
Returns:
the tf records schema
hops.featurestore.get_training_datasets(featurestore=None)

Gets a list of all training datasets in a featurestore

>>> # List all Training Datasets in a Feature Store
>>> featurestore.get_training_datasets()
>>> # By default `get_training_datasets()` will use the project's feature store, but this can also be specified with the optional argument featurestore
>>> featurestore.get_training_datasets(featurestore=featurestore.project_featurestore())
Args:
featurestore:the featurestore to list training datasets for, defaults to the project-featurestore
Returns:
A list of names of the training datasets in this featurestore
hops.featurestore.insert_into_featuregroup(df, featuregroup, featurestore=None, featuregroup_version=1, mode='append', descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5)

Saves the given dataframe to the specified featuregroup. Defaults to the project-featurestore This will append to the featuregroup. To overwrite a featuregroup, create a new version of the featuregroup from the UI and append to that table.

Example usage:

>>> # The API will default to the project's feature store, featuegroup version 1, and write mode 'append'
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features")
>>> # You can also explicitly define the feature store, the featuregroup version, and the write mode (only append and overwrite are supported)
>>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features", featurestore=featurestore.project_featurestore(), featuregroup_version=1, mode="append",     >>> featurestore.insert_into_featuregroup(sampleDf, "trx_graph_summary_features", featurestore=featurestore.project_featurestore(), featuregroup_version=1, mode="append", descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None))
Args:
df:the dataframe containing the data to insert into the featuregroup
featuregroup:the name of the featuregroup (hive table name)
featurestore:the featurestore to save the featuregroup to (hive database)
featuregroup_version:
 the version of the featuregroup (defaults to 1)
mode:the write mode, only ‘overwrite’ and ‘append’ are supported
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.insert_into_training_dataset(df, training_dataset, featurestore=None, training_dataset_version=1, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, corr_method='pearson', num_clusters=5, write_mode='overwrite')

Inserts the data in a training dataset from a spark dataframe (append or overwrite)

Example usage:

>>> featurestore.insert_into_training_dataset(dataset_df, "TestDataset")
>>> # By default the insert_into_training_dataset will use the project's featurestore, version 1,
>>> # and update the training dataset statistics, this configuration can be overridden:
>>> featurestore.insert_into_training_dataset(dataset_df,"TestDataset", featurestore=featurestore.project_featurestore(), training_dataset_version=1,descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
Args:
df:the dataframe to write
training_dataset:
 the name of the training dataset
featurestore:the featurestore that the training dataset is linked to
training_dataset_version:
 the version of the training dataset (defaults to 1)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:number of clusters to use for cluster analysis
corr_method:the method to compute feature correlation with (pearson or spearman)
write_mode:spark write mode (‘append’ or ‘overwrite’). Note: append is not supported for tfrecords datasets.
Returns:
None
hops.featurestore.project_featurestore()

Gets the project’s featurestore name (project_featurestore)

Returns:
the project’s featurestore name
hops.featurestore.sql(query, featurestore=None, dataframe_type='spark')

Executes a generic SQL query on the featurestore

Example usage:

>>> # The API will default to the project's feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5").show(5)
>>> # You can also explicitly define the feature store
>>> featurestore.sql("SELECT * FROM trx_graph_summary_features_1 WHERE triangle_count > 5", featurestore=featurestore.project_featurestore()).show(5)
Args:
query:SQL query
featurestore:the featurestore to query, defaults to the project’s featurestore
dataframe_type:the type of the returned dataframe (spark, pandas, python or numpy)
Returns:
A dataframe with the query results
hops.featurestore.update_featuregroup_stats(featuregroup, featuregroup_version=1, featurestore=None, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, num_clusters=5, corr_method='pearson')

Updates the statistics of a featuregroup by computing the statistics with spark and then saving it to Hopsworks by making a REST call.

Example usage:

>>> # The API will default to the project's featurestore, featuregroup version 1, and compute all statistics for all columns
>>> featurestore.update_featuregroup_stats("trx_summary_features")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True,feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_featuregroup_stats("trx_summary_features", featuregroup_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
featuregroup:the featuregroup to update the statistics for
featuregroup_version:
 the version of the featuregroup (defaults to 1)
featurestore:the featurestore where the featuregroup resides (defaults to the project’s featurestore)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use in clustering analysis (k-means)
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None
hops.featurestore.update_training_dataset_stats(training_dataset, training_dataset_version=1, featurestore=None, descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None, num_bins=20, num_clusters=5, corr_method='pearson')

Updates the statistics of a featuregroup by computing the statistics with spark and then saving it to Hopsworks by making a REST call.

Example usage:

>>> # The API will default to the project's featurestore, training dataset version 1, and compute all statistics for all columns
>>> featurestore.update_training_dataset_stats("teams_prediction")
>>> # You can also be explicitly specify featuregroup details and what statistics to compute:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True,feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=None)
>>> # If you only want to compute statistics for certain set of columns and exclude surrogate key-columns for example, you can use the optional argument stat_columns to specify which columns to include:
>>> featurestore.update_training_dataset_stats("teams_prediction", training_dataset_version=1, featurestore=featurestore.project_featurestore(), descriptive_statistics=True, feature_correlation=True, feature_histograms=True, cluster_analysis=True, stat_columns=['avg_trx', 'count_trx', 'max_trx', 'min_trx'])
Args:
training_dataset:
 the training dataset to update the statistics for
training_dataset_version:
 the version of the training dataset (defaults to 1)
featurestore:the featurestore where the training dataset resides (defaults to the project’s featurestore)
descriptive_statistics:
 a boolean flag whether to compute descriptive statistics (min,max,mean etc) for the featuregroup
feature_correlation:
 a boolean flag whether to compute a feature correlation matrix for the numeric columns in the featuregroup
feature_histograms:
 a boolean flag whether to compute histograms for the numeric columns in the featuregroup
cluster_analysis:
 a boolean flag whether to compute cluster analysis for the numeric columns in the featuregroup
stat_columns:a list of columns to compute statistics for (defaults to all columns that are numeric)
num_bins:number of bins to use for computing histograms
num_clusters:the number of clusters to use in clustering analysis (k-means)
corr_method:the method to compute feature correlation with (pearson or spearman)
Returns:
None

hops.hdfs module

API for interacting with the file system on Hops (HopsFS).

It is a wrapper around pydoop together with utility functions that are Hops-specific.

hops.hdfs.abs_path(hdfs_path)

Return an absolute path for hdfs_path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
Return an absolute path for hdfs_path.
hops.hdfs.access(hdfs_path, mode, project=None)

Perform the equivalent of os.access() on path.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if access is allowed, False if not.
hops.hdfs.capacity()

Returns the raw capacity of the filesystem

Returns:
filesystem capacity (int)
hops.hdfs.chmod(hdfs_path, mode, project=None)

Change file mode bits.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
mode:File mode (user/group/world privilege) bits
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.chown(hdfs_path, user, group, project=None)

Change file owner and group.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to the given project path in HDFS).
user:New hdfs username
group:New hdfs group
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
hops.hdfs.close()

Closes an the HDFS connection (disconnects to the namenode)

hops.hdfs.copy_to_hdfs(local_path, relative_hdfs_path, overwrite=False, project=None)

Copies a path from local filesystem to HDFS project (recursively) using relative path in $CWD to a path in hdfs (hdfs_path)

For example, if you execute:

>>> copy_to_hdfs("data.tfrecords", "/Resources/", project="demo")

This will copy the file data.tfrecords to hdfs://Projects/demo/Resources/data.tfrecords

Args:
local_path:the path on the local filesystem to copy
relative_hdfs_path:
 a path in HDFS relative to the project root to where the local path should be written
overwrite:a boolean flag whether to overwrite if the path already exists in HDFS
project:name of the project, defaults to the current HDFS user’s project
hops.hdfs.copy_to_local(hdfs_path, local_path, overwrite=False, project=None)

Copies a path from HDFS project to local filesystem. If there is not enough space on the local scratch directory, an exception is thrown.

Raises:
IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)
Args:
local_path:the path on the local filesystem to copy
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
overwrite:a boolean flag whether to overwrite if the path already exists in HDFS
project:name of the project, defaults to the current HDFS user’s project
Returns:
the full local pathname of the file/dir
hops.hdfs.cp(src_hdfs_path, dest_hdfs_path)

Copy the contents of src_hdfs_path to dest_hdfs_path.

If src_hdfs_path is a directory, its contents will be copied recursively. Source file(s) are opened for reading and copies are opened for writing.

Args:
src_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest_hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.dump(data, hdfs_path)

Dumps data to a file

Args:
data:data to write to hdfs_path
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.exists(hdfs_path, project=None)

Return True if hdfs_path exists in the default HDFS.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if hdfs_path exists.

Raises: IOError

hops.hdfs.get()

Get a handle to pydoop hdfs using the default namenode (specified in hadoop config)

Returns:
Pydoop hdfs handle
hops.hdfs.get_fs()

Get a handle to pydoop fs using the default namenode (specified in hadoop config)

Returns:
Pydoop fs handle
hops.hdfs.glob(hdfs_path, recursive=False, project=None)

Finds all the pathnames matching a specified pattern according to the rules used by the Unix shell, although results are returned in arbitrary order.

Globbing gives you the list of files in a dir that matches a supplied pattern

>>> import glob
>>> glob.glob('./[0-9].*')
>>> ['./1.gif', './2.txt']

glob is implemented as os.listdir() and fnmatch.fnmatch() We implement glob as hdfs.ls() and fnmatch.filter()

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Raises:
IOError if the supplied hdfs path does not exist
Returns:
A possibly-empty list of path names that match pathname, which must be a string containing a path specification. pathname can be either absolute
hops.hdfs.isdir(hdfs_path, project=None)

Return True if path refers to a directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a directory.

Raises: IOError

hops.hdfs.isfile(hdfs_path, project=None)

Return True if path refers to a file.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
project:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
True if path refers to a file.

Raises: IOError

hops.hdfs.load(hdfs_path)

Read the content of hdfs_path and return it.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
the read contents of hdfs_path
hops.hdfs.localize(hdfs_path)

Localizes (copies) the given file or directory from HDFS into a local scratch directory, indicated by the env variable $PDIR. Returns the absolute path for the local file. If there is not enough space on the local scratch directory, an exception is thrown.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Raises:
IOError if there is not enough space to localize the file/directory in HDFS to the scratch directory ($PDIR)
Returns:
Return an absolute path for local file/directory.
hops.hdfs.log(string)

Logs a string to the log file

Args:
string:string to log
hops.hdfs.ls(hdfs_path, recursive=False)

lists a directory in HDFS

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths
hops.hdfs.lsl(hdfs_path, recursive=False, project=None)

Returns all the pathnames in the supplied directory.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
recursive:if it is a directory and recursive is True, the list contains one item for every file or directory in the tree rooted at hdfs_path.
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
Returns:
A possibly-empty list of path names stored in the supplied path.
hops.hdfs.mkdir(hdfs_path, project=None)

Create a directory and its parents as needed.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.move(src, dest)

Move or rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.open_file(hdfs_path, project=None, flags='rw', buff_size=0)

Opens an HDFS file for read/write/append and returns a file descriptor object (fd) that should be closed when no longer needed.

Args:
hdfs_path: you can specify either a full hdfs pathname or a relative one (relative to your project’s path in HDFS) flags: supported opening modes are ‘r’, ‘w’, ‘a’. In addition, a trailing ‘t’ can be added to specify text mode (e.g, ‘rt’ = open for reading text) buff_size: Pass 0 as buff_size if you want to use the “configured” values, i.e the ones set in the Hadoop configuration files.
Returns:
A file descriptor (fd) that needs to be closed (fd-close()) when it is no longer needed.
Raises:
IOError: If the file does not exist.
hops.hdfs.project_name()

Extracts the project name from the project username (“project__user”)

Returns:
project name
hops.hdfs.project_path(project=None)

Get the path in HopsFS where the HopsWorks project is located. To point to a particular dataset, this path should be appended with the name of your dataset.

>>> from hops import hdfs
>>> project_path = hdfs.project_path()
>>> print("Project path: {}".format(project_path))
Args:
project_name:If this value is not specified, it will get the path to your project. If you need to path to another project, you can specify the name of the project as a string.
Returns:
returns the project absolute path
hops.hdfs.project_user()

Gets the project username (“project__user”) from environment variables

Returns:
the project username
hops.hdfs.rename(src, dest)

Rename src to dest.

Args:
src:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
dest:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
hops.hdfs.rmr(hdfs_path, project=None)

Recursively remove files and directories.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to project_name in HDFS).
project:If the supplied hdfs_path is a relative path, it will look for that file in this project’s subdir in HDFS.
hops.hdfs.stat(hdfs_path)

Performs the equivalent of os.stat() on hdfs_path, returning a StatResult object.

Args:
hdfs_path:You can specify either a full hdfs pathname or a relative one (relative to your Project’s path in HDFS).
Returns:
returns a list of hdfs paths

hops.kafka module

A module for setting up Kafka Brokers and Consumers on the Hops platform. It hides the complexity of configuring Kafka by providing utility methods such as:

  • get_broker_endpoints().
  • get_security_protocol().
  • get_kafka_default_config().
  • etc.

Using these utility functions you can setup Kafka with the Kafka client-library of your choice, e.g SparkStreaming or confluent-kafka-python. For example, assuming that you have created a topic called “test” on Hopsworks and that you have installed confluent-kafka-python inside your project’s anaconda environment:

>>> from hops import kafka
>>> from hops import tls
>>> from confluent_kafka import Producer, Consumer
>>> TOPIC_NAME = "test"
>>> config = kafka.get_kafka_default_config()
>>> producer = Producer(config)
>>> consumer = Consumer(config)
>>> consumer.subscribe(["test"])
>>> # wait a little while before executing the rest of the code (put it in a different Jupyter cell)
>>> # so that the consumer get chance to subscribe (asynchronous call)
>>> for i in range(0, 10):
>>> producer.produce(TOPIC_NAME, "message {}".format(i), "key", callback=delivery_callback)
>>> # Trigger the sending of all messages to the brokers, 10sec timeout
>>> producer.flush(10)
>>> for i in range(0, 10):
>>> msg = consumer.poll(timeout=5.0)
>>> if msg is not None:
>>>     print('Consumed Message: {} from topic: {}'.format(msg.value(), msg.topic()))
>>> else:
>>>     print("Topic empty, timeout when trying to consume message")

Similarly, you can define a pyspark kafka consumer as follows, using the spark session defined in variable spark

>>> from hops import kafka
>>> from hops import tls
>>> TOPIC_NAME = "test"
>>> df = spark \.format("kafka")
>>> .option("kafka.bootstrap.servers", kafka.get_broker_endpoints())
>>> .option("kafka.ssl.truststore.location", tls.get_trust_store())
>>> .option("kafka.ssl.truststore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.keystore.location", tls.get_key_store())
>>> .option("kafka.ssl.keystore.password", tls.get_key_store_pwd())
>>> .option("kafka.ssl.key.password", tls.get_trust_store_pwd())
>>> .option("subscribe", TOPIC_NAME)
>>> .load()
hops.kafka.get_broker_endpoints()

Get Kafka broker endpoints as a string with broker-endpoints “,” separated

Returns:
a string with broker endpoints comma-separated
hops.kafka.get_broker_endpoints_list()

Get Kafka broker endpoints as a list

Returns:
a list with broker endpoint strings
hops.kafka.get_kafka_default_config()

Gets a default configuration for running secure Kafka on Hops

Returns:
dict with config_property –> value
hops.kafka.get_schema(topic, version_id=1)

Gets the Avro schema for a particular Kafka topic and its version.

Args:
topic:Kafka topic name
version_id:Schema version ID
Returns:
Avro schema as a string object in JSON format
hops.kafka.get_security_protocol()

Gets the security protocol used for communicating with Kafka brokers in a Hopsworks cluster

Returns:
the security protocol for communicating with Kafka brokers in a Hopsworks cluster

hops.serving module

Utility functions to export models to the Models dataset and get information about models currently being served in the project.

hops.serving.export(local_model_path, model_name, model_version)
Args:
local_model_path: model_name: model_version:

Returns:

hops.serving.get_serving_endpoint(model, project=None)
Args:
model: project:

Returns:

hops.tensorboard module

Utility functions to manage the lifecycle of TensorBoard and get the path to write TensorBoard events.

hops.tensorboard.interactive_debugger()

Returns: address for interactive debugger in TensorBoard

hops.tensorboard.logdir()

Get the TensorBoard logdir. This function should be called in your wrapper function for Experiment, Parallel Experiment or Distributed Training and passed as the logdir for TensorBoard.

Case 1: local_logdir=True, then the logdir is on the local filesystem, otherwise it is in the folder for your experiment in your project in HDFS. Once the experiment is finished all the files that are present in the directory will be uploaded to tour experiment directory in the Experiments dataset.

Case 2: local_logdir=False, then the logdir is in HDFS in your experiment directory in the Experiments dataset.

Returns:
The path to store files for your experiment. The content is also visualized in TensorBoard.
hops.tensorboard.non_interactive_debugger()

Returns: address for non-interactive debugger in TensorBoard

hops.tensorboard.visualize(hdfs_root_logdir)

Visualize all TensorBoard events for a given path in HopsFS. This is intended for use after running TensorFlow jobs to visualize them all in the same TensorBoard. tflauncher.launch returns the path in HopsFS which should be handed as argument for this method to visualize all runs.

Args:
hdfs_root_logdir:
 the path in HopsFS to enter as the logdir for TensorBoard

hops.tls module

A module for getting TLS certificates in YARN containers, used for setting up Kafka inside Jobs/Notebooks on Hops.

hops.tls.get_ca_chain_location()

Get location of chain of CA certificates (PEM format) that are required to validate the private key certificate of the client used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to ca chain of certificate
hops.tls.get_client_certificate_location()

Get location of client certificate (PEM format) for the private key signed by trusted CA used for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client certificate in PEM format
hops.tls.get_client_key_location()

Get location of client private key (PEM format) used for for 2-way TLS authentication, for example with Kafka cluster

Returns:
string path to client private key in PEM format
hops.tls.get_key_store()

Get keystore location

Returns:
keystore filename
hops.tls.get_key_store_cert()

Get keystore certificate from local container

Returns:
Certificate password
hops.tls.get_key_store_pwd()

Get keystore password

Returns:
keystore password
hops.tls.get_trust_store()

Get truststore location

Returns:
truststore filename
hops.tls.get_trust_store_pwd()

Get truststore password

Returns:
truststore password

hops.util module

Miscellaneous utility functions for user applications.

hops.util.get_job_name()

If this method is called from inside a hopsworks job, it returns the name of the job.

Returns:
the name of the hopsworks job
hops.util.grid_params(dict)

Generate all possible combinations (cartesian product) of the hyperparameter values

Args:
dict:
Returns:
A new dictionary with a grid of all the possible hyperparameter combinations
hops.util.num_executors()

Get the number of executors configured for Jupyter

Returns:
Number of configured executors for Jupyter
hops.util.num_param_servers()

Get the number of parameter servers configured for Jupyter

Returns:
Number of configured parameter servers for Jupyter

Module contents