Commit 7ddfd541 authored by Staiger's avatar Staiger
Browse files

Adding HPC training with words workflow and jupyter notebook for demo.

parent a0f3bd05
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Wordcount - an example how to work on a compute cluster with data stored in iRODS\n",
"\n",
"## Imports\n",
"- Standard python modules to do file operations and generate timestamps\n",
"- Own library of useful functions\n",
"- The necessary iRODS modules for connecting to iRODS, Data, Collection and Metadata operations inside of iRODS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import datetime\n",
"from helperFunctions import *\n",
"\n",
"from irods.session import iRODSSession\n",
"from irods.models import Collection, DataObject, CollectionMeta, DataObjectMeta"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Connecting to iRODS"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#PARAMETERS\n",
"# iRODS connection\n",
"host='scomp1447.wurnet.nl'\n",
"port=1247\n",
"user='irods-user1'\n",
"zone='aliceZone'\n",
"\n",
"# get password from file\n",
"with open('passwd', 'r') as f:\n",
" passwd = f.readline().strip()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Parameters for our computational pipeline\n",
"- Keywords and their values to search for the correct data in iRODS\n",
"- Setting up the folder structure on fast storage of the compute cluster.\n",
" The data stored here is **not backed up**, nor safely stored, this storage is just used to allow very quick calculations on the data."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# data search\n",
"ATTR_NAME = 'AUTHOR'\n",
"ATTR_VALUE = 'Lewis Carroll'"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Creating local directories for analysis and results')\n",
"dataDir = '/lustre/scratch/GUESTS/staig001/wordcountData'\n",
"ensure_dir(dataDir)\n",
"resultsDir = '/lustre/scratch/GUESTS/staig001/wordcountResults'\n",
"ensure_dir(resultsDir)\n",
"print('/lustre/scratch/GUESTS/staig001')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Connect to iRODS '+ zone)\n",
"session = iRODSSession(host=host, port=port, user=user, password=passwd, zone=zone)\n",
"print('You have access to: ')\n",
"colls = [coll.path for coll in session.collections.get('/'+zone+'/'+'home').subcollections]\n",
"print(colls)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Search for your input data"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Searching for files')\n",
"query = session.query(Collection.name, DataObject.name)\n",
"filteredQuery = query.filter(DataObjectMeta.name == ATTR_NAME).\\\n",
" filter(DataObjectMeta.value == ATTR_VALUE)\n",
"print(filteredQuery.all())\n",
"iPaths = iParseQuery(filteredQuery)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Prepare data for analysis\n",
"To have look inside of the data we have two options in iRODS:\n",
"1. We download the data to our fast storage system and have the data available and ready for being read from there.\n",
"2. In some cases single files can become too large to be downloaded quickly or even too large to fit into the memory of the machine you are working on. In that case we can stream files into memory, i.e. reading a file bit by bit or just the interesting parts.\n",
"\n",
"In this tutorial we will continue with option 1:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Downloading: ')\n",
"print('\\n'.join(iPaths))\n",
"iGetList(session, iPaths, dataDir)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Start your computational pipeline"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Start wordcount')\n",
"dataFiles = [dataDir+'/'+f for f in os.listdir(dataDir)]\n",
"resFile = wordcount(dataFiles,resultsDir)\n",
"print('Results of calculations:', resFile)\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"What have we actually calculated?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(resFile, 'r') as f:\n",
" print(f.readlines())"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Note**, our results are stored on the fast but not safe storage! We need to upload the data to iRODS quickly!\n",
"## Uploading your data to safe storage through iRODS and annotating the results"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"coll = session.collections.get('/' + zone + '/home/' +user)\n",
"objNames = [obj.name for obj in coll.data_objects]\n",
"f = os.path.basename(resFile)\n",
"# little trick to prevent overwriting of data, if the filename already exists in iRODS we extend it with a number\n",
"count = 0\n",
"while f in objNames:\n",
" f = os.path.basename(resFile) + '_' +str(count)\n",
" count = count + 1"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Upload results to: ', coll.path + '/' + f)\n",
"session.data_objects.put(resFile, coll.path + '/' + f)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, we can annontate the data in iRODS to ensure we know later where we got it from:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Adding metadata to', coll.path + '/' + f)\n",
"obj = session.data_objects.get(coll.path + '/' + f)\n",
"for iPath in iPaths:\n",
" obj.metadata.add('prov:wasDerivedFrom', iPath)\n",
"\n",
"obj.metadata.add('ISEARCH', ATTR_NAME + '==' + ATTR_VALUE)\n",
"obj.metadata.add('ISEARCHDATE', str(datetime.date.today()))\n",
"obj.metadata.add('prov:SoftwareAgent', 'wordcount.py')\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Last check: How is the file annotated in iRODS?"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('Metadata for: ', coll.path + '/' + f)\n",
"print('\\n'.join([item.name +' \\t'+ item.value for item in obj.metadata.items()]))\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Remove temporary data from srcatch"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "python 3.8.5",
"language": "python",
"name": "python3.8.5"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
"""
@licence: Apache 2.0
@Copyright (c) 2017, Christine Staiger (SURFsara)
@author: Christine Staiger
"""
import os
#wordcount
from collections import Counter
import json
import string
#irods
import irods.keywords as kw
#create directory if it does not exist already
def ensure_dir(directory):
if not os.path.exists(directory):
os.makedirs(directory)
#wordcount program
#simple wordcount function which counts every instance of a word, note case sensitive
def wordcount(dataFiles, resultsDir):
print(dataFiles)
print(resultsDir)
words = []
for path in dataFiles:
with open(path) as f:
text = f.read().split()
newWords = [''.join(char for char in word
if char not in string.punctuation) for word in text]
words.extend(newWords)
print(len(words))
numWords = Counter(words)
#store results
resultsFile=resultsDir+"/resultswordcount.dat"
with open(resultsFile, 'w') as file:
file.write(json.dumps(numWords))
return resultsFile
#irods helper functions
def iParseQuery(queryResults):
"""
Parse a query that fetched Collection.name and DataObject.name; the function creates the full iRODS path of all yielded files.
Usage example:
iParseQuery(sess.query(Collection.name, DataObject.name))
iParseQuery(sess.query(Collection.name, DataObject.name).filter(DataObjectMeta.name == 'author' and DataObjectMeta.value == 'Lewis Carroll'))
"""
iPaths = []
results = queryResults.get_results()
for item in results:
for k in item.keys():
if k.icat_key == 'DATA_NAME':
name = item[k]
elif k.icat_key == 'COLL_NAME':
coll = item[k]
else:
continue
iPaths.append(coll+'/'+name)
return iPaths
def iGetList(sess, iPaths, destFolder):
"""
Downloads a list of data objects from iRODS and saves them in the destination folder.
Watch out: Data will be overwritten!
Example usage:
iGetList(sess, ['/aliceZone/home/irods-user1/myiPyFun.py'], '/home/user1/dataFilesToCompute')
Parameters:
sess - iRODS session
iPaths - List of full iRODS paths to data objects
destFolder - Location, unix filesystem
"""
ensure_dir(destFolder)
print("Write to: ", destFolder)
for iPath in iPaths:
buff = sess.data_objects.open(iPath, 'r').read()
with open(destFolder+'/'+os.path.basename(iPath), 'wb') as f:
f.write(buff)
def iPutFile(sess, fileName, iPath):
"""
Uploads a file to iRODS and returns the iRODS data object.
Watch out: Returns an error if data object already exists.
Example usage:
myObj = iPutFile(sess, "/home/user1/pythonscripts/helperFunctions.py", "/aliceZone/home/irods-user1/myiPyFun.py")
Parameters:
sess - iRODS session
iPath - Full iRODS paths to destination data object
fileName - Full path to file, unix filesystem
"""
options = {kw.REG_CHKSUM_KW: ''}
with open(fileName, 'r') as f:
content = f.read()
obj = sess.data_objects.create(iPath)
with obj.open('w', options) as obj_desc:
obj_desc.write(content)
obj = sess.data_objects.get(iPath)
return obj
def iLsColl(sess, iPath):
"""
Lists the whole iRODS collection recursively.
Exanple usage:
iLsColl(sess, '/aliceZone/home/irods-user1')
Parameters:
sess - iRODS session
iPath - Full iRODS paths to the iRODS collection
"""
iColl = sess.collections.get(iPath)
for srcColl, subColls, objs in iColl.walk():
print("-C "+srcColl.path)
print("\n".join([" "+obj.path for obj in objs]))
import os
import datetime
from helperFunctions import *
from irods.session import iRODSSession
from irods.models import Collection, DataObject, CollectionMeta, DataObjectMeta
#PARAMETERS
# iRODS connection
host='<iRODS host>'
port=1247
user='<your username>'
password='<your password>'
zone='aliceZone'
# data search
ATTR_NAME = 'AUTHOR'
ATTR_VALUE = 'Lewis Carroll'
print('Creating local directories for analysis and results')
dataDir = '/lustre/scratch/GUESTS/staig001/wordcountData'
ensure_dir(dataDir)
resultsDir = '/lustre/scratch/GUESTS/staig001/wordcountResults'
ensure_dir(resultsDir)
print('Connect to iRODS')
session = iRODSSession(host=host, port=port, user=user, password=password, zone=zone)
print('You have access to: ')
colls = session.collections.get('/'+zone+'/'+'home').subcollections()
print(colls)
print('Searching for files')
query = session.query(Collection.name, DataObject.name)
filteredQuery = query.filter(DataObjectMeta.name == ATTR_NAME).\
filter(DataObjectMeta.value == ATTR_VALUE)
print(filteredQuery.all())
iPaths = iParseQuery(filteredQuery)
print('Downloading: ')
print('\n'.join(iPaths))
iGetList(session, iPaths, dataDir)
print('Start wordcount')
dataFiles = [dataDir+'/'+f for f in os.listdir(dataDir)]
resFile = wordcount(dataFiles,resultsDir)
#upload
coll = session.collections.get('/' + zone + '/home/' +user)
objNames = [obj.name for obj in coll.data_objects]
f = os.path.basename(resFile)
count = 0
while f in objNames:
f = os.path.basename(resFile) + '_' +str(count)
count = count + 1
print('Upload results to: ', coll.path + '/' + f)
session.data_objects.put(resFile, coll.path + '/' + f)
print('Adding metadata')
obj = session.data_objects.get(coll.path + '/' + f)
for iPath in iPaths:
obj.metadata.add('prov:wasDerivedFrom', iPath)
obj.metadata.add('ISEARCH', ATTR_NAME + '==' + ATTR_VALUE)
obj.metadata.add('ISEARCHDATE', str(datetime.date.today()))
obj.metadata.add('prov:SoftwareAgent', 'wordcount.py')
print('Metadata for: ', coll.path + '/' + f)
print('\n'.join([item.name +' \t'+ item.value for item in obj.metadata.items()]))
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment