Anchor | ||||
---|---|---|---|---|
|
ステップの処理ロジックを機能させるためには、数多くのメソッドを実装しなくてはいけません。一部のメソッドは、ロウ(行)ステップか、またはキャッシュステップかに応じて異なります。
...
メソッド | 説明と例 | ||||||||||||||||||||||||||||||||||||||||||
public ETLStepAPIVersion getAPIVersion() | ステップの更新を維持するYellowfinステップAPIバージョンの定義に使用するメソッドです。これは通常、列挙型ETLStepAPIVersion内の最新のバージョンです。APIバージョンは、互換性を判断するために使用されます。
| ||||||||||||||||||||||||||||||||||||||||||
public Collection<ETLException> validate() | 事前実行検証を行うメソッドです。実装は、必須オプションが設定されているか、資格情報は正確か、ホストに到達可能かを確認しなくてはいけません。エラーは、ETLExceptionのインスタンスに取得され、メソッドから返されるコレクションに追加されます。または、ETLExceptionを構築する代わりに、便宜メソッドであるgetInvalidConfigETLException()を使用することもできます。
| ||||||||||||||||||||||||||||||||||||||||||
public Map<String, String> getValidatedStepOptions() | ステップオプションを有効にするメソッドです。optionKeyからoptionValueへのマッピングは、Yellowfinリポジトリデータベースに保存されます。こちらで、無効なオプション値を削除することができます。this.getStepOptions()により返されるマッピングの操作には影響しません。
| ||||||||||||||||||||||||||||||||||||||||||
public void setupGeneratedFields() | ステップが新規フィールドの出力を必要とする場合は、こちらのメソッドを実装します。新規フィールド内のデータは、他のフィールドを使用して生成されます。新規フィールドは既存フィールドを置き換えるか、複製する場合もあります。Yellowfinは、各操作に便宜メソッドを提供します。こちらのメソッドは、ETLStepMetadataFieldBeanの新規インスタンスを作成するか、既存フィールドを複製することが予想されます。フィールドが事前に設定されていないか、再度設定する必要がある場合にのみ、こちらのメソッドを実行することが重要です。オプション内の変更が原因でフィールドが再作成された場合は、古いフィールドを削除しなくてはいけません。削除をしない場合、ステップが再設定される度に新規フィールドが生成されることになります。
| ||||||||||||||||||||||||||||||||||||||||||
public Integer getMinInputSteps() public Integer getMaxInputSteps() public Integer getMinOutputSteps() public Integer getMaxOutputSteps() | ステップに複数のインプットやアウトプットがある場合、これらのメソッドを上書きしなくてはいけません。Yellowfinは、ステップカテゴリーに基づき最小値/最大値を返すデフォルト実装を提供します。これらの値は、これらのカテゴリーのETLStepCategory列挙型要素内で定義されます。 | ||||||||||||||||||||||||||||||||||||||||||
YFLogger | これはメソッドではありませんが、すべてのステップに共通します。ステップは、YFLoggerを使用して、データトランスフォーメーションログに書き込むことができます。これは、インスタンス変数として宣言する必要があります。YFLoggerは、log4jのロガーclassのラッパです。
|
...
ロウ(行)ステップ実装
ロウ(行)ステップは、AbstractETLRowStep classを拡張します。これには、ひとつのメソッド(processWireData())の実装のみが必要です。
processWireData()
フレームワークが、processWireData()を呼び出すと、現在のロウ(行)の各カラム(列)からのデータは、すでに適切なWire上にあります。各wireはメタデータフィールドにマッピングされ、this.getWireForField(fieldUUID)を使用してアクセスされます。データはwireから取得され、処理され、同一のwire、または別のwireに戻されます。
...
Code Block | ||||
---|---|---|---|---|
| ||||
} catch (InterruptedException e) {
throw e;
} catch (Exception e) {
log.error("Error: " + e, e);
throwUnhandledETLException(e);
} |
こちらは、特定のフィールドに数値を追加する実装の例です。
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
protected boolean processWireData(List<ETLStepMetadataFieldBean> fields)
throws ETLException, InterruptedException {
// The options should've been validated by the validate() method,
// so no need for further checks here
String appendFieldUUID = this.getStepOption("APPEND_FIELD");
String newFieldUUID = this.getStepOption("NEW_FIELD");
String appendValue = this.getStepOption("APPEND_VALUE");
Wire<Object, String> appendFieldWire = this.getWireForField(appendFieldUUID);
Wire<Object, String> newFieldWire = this.getWireForField(newFieldUUID);
Object data = appendFieldWire.getValue();
String newFieldData = null;
if (data == null) {
newFieldData = appendValue;
} else {
newFieldData = data.toString() + appendValue;
}
newFieldWire.send(newFieldData);
return true;
} |
こちらの例では、すべてのロウ(行)のデータでprocessWireData()が実行されます。実際には、後続のメソッドの呼び出しで変更されないオブジェクトは、メンバー変数にキャッシュしなくてはいけません。例えば、appendFieldUUID、newFieldUUID、appendValue、appnedFieldWire、newFieldWireはメンバー変数にすべきであり、processWoreData()が初めて実行された時にだけ設定されます。
キャッシュステップ実装
キャッシュステップは、AbstractETLCachedStepを拡張します。ひとつのメソッド(processEndRows())のみ実装しなくてはいけません。キャッシュステップは通常、ひとつ以上のインプットステップを持ちます。データ取得ステップはインプットを持たないため、キャッシュステップとして実装されることが多いです。例えばこれは、SQLクエリーの実行によりデータを生成します。
processEndRow()
- インプットステップ:フレームワークは、プロセスが実行が開始されるとすぐに、インプットステップにprocessEndRows()を呼び出します。インプットステップは、データキャッシングを気にする必要がありません。しかし、実装はwireにデータを配置し、その出力へ送らなくてはいけません。こちらは、データ生成ステップのメソッドの実装例です。
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
protected void processEndRows() throws ETLException, InterruptedException {
// Get the first output flow;
// Useful for most steps which have a single output
String outFlow = getFirstOutputFlow();
// The step outputs four Generated fields.
List<String> orderedFieldUUIDs = new ArrayList<>(4);
// The step's implementation of setupGeneratedFields()
// should set these up. Their UUIDs would've been saved as step options.
orderedFieldUUIDs.add(getStepOption("FIELD1_UUID"));
orderedFieldUUIDs.add(getStepOption("FIELD2_UUID"));
orderedFieldUUIDs.add(getStepOption("FIELD3_UUID"));
orderedFieldUUIDs.add(getStepOption("FIELD4_UUID"));
// Sample Data
String[] field1_data = {"Adventure", "Relaxation", "Culture", "Family"};
int[] field2_data = {30, 32, 11, 44};
Date[] field3_data = {new Date(103882823L), new Date(10388283323L),
new Date(103883232823L), new Date(102323882823L)};
Timestamp[] field4_data = {new Timestamp(103882823L), new Timestamp(10388283323L),
new Timestamp(103883232823L), new Timestamp(102323882823L)};
// Generate as many rows as configured in Step Option ROW_COUNT
int rowCount = 10;//Integer.parseInt(getStepOption("ROW_COUNT"));
Random random = new Random();
for (int i = 0 ; i < rowCount ; i++) {
// Data is emitted in packets.
// This implementation creates a new packet for every row.
// Data packets can accumulate rows and emit, say, every 20 rows.
ETLStepResult dataPacket = getFreshDataPacket(outFlow);
Object[] row = new Object[4];
row[0] = field1_data[random.nextInt(4)];
row[1] = field2_data[random.nextInt(4)];
row[2] = field3_data[random.nextInt(4)];
row[3] = field4_data[random.nextInt(4)];
// Send the row of data from Default Fields to Output Fields
beginInternalTransmission(row, orderedFieldUUIDs);
// Accumulate transmitted data in a data packet
endInternalTransmission(dataPacket);
// Emit the packet of data to the next step.
// This may be done less frequently, after accumulating rows
emitData(dataPacket);
}
} |
- トランスフォーメーションステップ:フレームワークは、すべてのインプットがキャッシュステップへのデータ送信を終了した時に、キャッシュトランスフォーメーションステップにprocessEndRows()を呼び出します。各インプットステップのデータは、異なるメモリーキャッシュに保存されます。ステップ実装は必ずwireにデータを送信し、ステップからのデータをそのアウトプットへ送らなくてはいけません。以下の例は、Union-Allステップのprocを実装し、データキャッシュがどのように使用されるのかを示しています。
Code Block | ||||
---|---|---|---|---|
| ||||
@Override
protected void processEndRows() throws ETLException, InterruptedException {
// Get input flows which feed data to this step
Set<String> inputFlowUuids = this.getInputFlowUuids();
// Get the output flow as there can be only one
String outFlowUuid = getFirstOutputFlow();
// Get a data packet
ETLStepResult dataPacket = getFreshDataPacket(outFlowUuid);
for (String inputFlowUuid : inputFlowUuids) {
// Get data of each input from its cache
ETLDataCache inputData = getDataCache(inputFlowUuid);
// Use this to get the Default Metadata Field corresponding to an Input Field.
// Input Metadata Field is the field in the input step.
// Cached data will be in the order of input fields.
Map<String, String> inputToDefaultFieldMap = getInputToDefaultFieldMap();
// The data will match the Input Metadata Fields
List<ETLStepMetadataFieldBean> inputFieldList = inputData.getMetadataFields();
List<String> unionResultFields = new ArrayList<String>();
for(ETLStepMetadataFieldBean fieldBean : inputFieldList){
// Get the Default Metadata Field for an Input Metadata Field
String inputFieldUuid = fieldBean.getEtlStepMetadataFieldUUID();
String defaultFieldUuid = inputToDefaultFieldMap.get(inputFieldUuid);
// Get the Generated Default Metadata Field holding the result of the Union.
// getUnionFieldForDefaultField() is a method defined in the Step.
// The result of the union operation is sent to new generated fields.
// It figures out how a Default Field is linked to the generated field.
String unionResultField = getUnionFieldForDefaultField(defaultFieldUuid);
// This holds the Union field corresponding to the Input Field.
// Fields which are excluded from the union will have a null entry.
unionResultFields.add(unionResultField);
}
// Iterate through the cached data
Iterator<Object[]> it = inputData.iterator();
while (it.hasNext()) {
Object[] row = it.next();
// Transmit data from:
// input fields -> default -> generated default (union fields) -> output
// Data excluded from the union will have a "null" field,
// so nothing will be transmitted.
this.beginInternalTransmission(row, unionResultFields);
this.endInternalTransmission(dataPacket);
}
}
// Accumulate all data before emitting to the next step
emitData(dataPacket);
} |
...